* Make plugin hadoop2-only

Polish MiniDFS cluster to be Hadoop2 (instead of Hadoop1) based
This commit is contained in:
Costin Leau 2015-12-19 01:26:58 +02:00
parent af11707da0
commit 7584810ff4
17 changed files with 377 additions and 292 deletions

View File

@ -24,51 +24,23 @@ esplugin {
classname 'org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin'
}
configurations {
hadoop1
hadoop2
}
versions << [
'hadoop1': '1.2.1',
'hadoop2': '2.7.1'
]
configurations {
hadoop2
}
dependencies {
provided "org.elasticsearch:elasticsearch:${versions.elasticsearch}"
provided "org.apache.hadoop:hadoop-core:${versions.hadoop1}"
// use Hadoop1 to compile and test things (a subset of Hadoop2)
testCompile "org.apache.hadoop:hadoop-core:${versions.hadoop1}"
testCompile "org.apache.hadoop:hadoop-test:${versions.hadoop1}"
// Hadoop dependencies
testCompile "commons-configuration:commons-configuration:1.6"
testCompile "commons-lang:commons-lang:${versions.commonslang}"
testCompile "commons-collections:commons-collections:3.2.2"
testCompile "commons-net:commons-net:1.4.1"
testCompile "org.mortbay.jetty:jetty:6.1.26"
testCompile "org.mortbay.jetty:jetty-util:6.1.26"
testCompile "org.mortbay.jetty:servlet-api:2.5-20081211"
testCompile "com.sun.jersey:jersey-core:1.8"
hadoop1("org.apache.hadoop:hadoop-core:${versions.hadoop1}") {
exclude module: "commons-cli"
exclude group: "com.sun.jersey"
exclude group: "org.mortbay.jetty"
exclude group: "tomcat"
exclude module: "commons-el"
exclude module: "hsqldb"
exclude group: "org.eclipse.jdt"
exclude module: "commons-beanutils"
exclude module: "commons-beanutils-core"
exclude module: "junit"
// provided by ES itself
exclude group: "log4j"
}
hadoop2("org.apache.hadoop:hadoop-client:${versions.hadoop2}") {
exclude module: "commons-cli"
hadoop2 ("org.apache.hadoop:hadoop-client:${versions.hadoop2}") {
exclude module: "hadoop-yarn-common"
exclude module: "hadoop-mapreduce-client-app"
exclude module: "hadoop-mapreduce-client-core"
exclude module: "hadoop-mapreduce-client-jobclient"
exclude module: "hadoop-yarn-api"
exclude group: "commons-cli"
exclude group: "com.sun.jersey"
exclude group: "com.sun.jersey.contribs"
exclude group: "com.sun.jersey.jersey-test-framework"
@ -82,37 +54,57 @@ dependencies {
exclude module: "commons-beanutils-core"
exclude module: "javax.servlet"
exclude module: "junit"
exclude module: "netty"
// provided by ES itself
exclude group: "log4j"
}
hadoop2("org.apache.hadoop:hadoop-hdfs:${versions.hadoop2}") {
hadoop2 ("org.apache.hadoop:hadoop-hdfs:${versions.hadoop2}") {
// prevent jar hell
exclude module: "hadoop-yarn-common"
exclude module: "commons-cli"
exclude module: "netty"
exclude module: "guava"
exclude module: "junit"
// provided by ES itself
exclude group: "log4j"
}
}
provided "org.elasticsearch:elasticsearch:${versions.elasticsearch}"
provided configurations.hadoop2
testCompile ("org.apache.hadoop:hadoop-hdfs:${versions.hadoop2}:tests") {
exclude module: "commons-cli"
exclude module: "netty"
}
testCompile ("org.apache.hadoop:hadoop-common:${versions.hadoop2}:tests") {
exclude module: "commons-cli"
}
}
configurations.all {
// used due to _transitive_ configuration
resolutionStrategy {
force "commons-cli:commons-cli:1.3.1"
force "io.netty:netty:3.10.5.Final"
force "commons-codec:commons-codec:${versions.commonscodec}"
force "commons-logging:commons-logging:${versions.commonslogging}"
force "commons-lang:commons-lang:2.6"
force "commons-httpclient:commons-httpclient:3.0.1"
force "org.codehaus.jackson:jackson-core-asl:1.8.8"
force "org.codehaus.jackson:jackson-mapper-asl:1.8.8"
force "org.codehaus.jackson:jackson-core-asl:1.9.13"
force "org.codehaus.jackson:jackson-mapper-asl:1.9.13"
force "com.google.code.findbugs:jsr305:3.0.0"
force "com.google.guava:guava:16.0.1"
force "org.slf4j:slf4j-api:1.7.10"
force "org.slf4j:slf4j-log4j12:1.7.10"
force "org.slf4j:slf4j-api:${versions.slf4j}"
force "org.slf4j:slf4j-log4j12:${versions.slf4j}"
force "junit:junit:${versions.junit}"
force "org.apache.httpcomponents:httpclient:4.3.6"
force "log4j:log4j:${versions.log4j}"
}
}
dependencyLicenses {
mapping from: /hadoop-core.*/, to: 'hadoop-1'
mapping from: /hadoop-.*/, to: 'hadoop-2'
mapping from: /hadoop-.*/, to: 'hadoop'
}
compileJava.options.compilerArgs << '-Xlint:-deprecation,-rawtypes'
@ -145,62 +137,31 @@ bundlePlugin {
}
}
task distZipHadoop1(type: Zip, dependsOn: [hadoopLinkedJar, jar]) { zipTask ->
from (zipTree(bundlePlugin.archivePath)) {
include "*"
include "internal-libs/**"
}
description = "Builds archive (with Hadoop1 dependencies) suitable for download page."
classifier = "hadoop1"
into ("hadoop-libs") {
from configurations.hadoop1.allArtifacts.files
from configurations.hadoop1
}
task miniHdfsStart(type: JavaExec) {
classpath = sourceSets.test.compileClasspath + sourceSets.test.output
main = "org.elasticsearch.plugin.hadoop.hdfs.MiniHDFS"
errorOutput = new FileOutputStream("build/minihdfs.err")
standardOutput = new FileOutputStream("build/minihdfs.out")
//ext.hdfsPid = (main as Class).getPid
}
task distZipHadoop2(type: Zip, dependsOn: [hadoopLinkedJar, jar]) { zipTask ->
from (zipTree(bundlePlugin.archivePath)) {
include "*"
include "internal-libs/**"
}
description = "Builds archive (with Hadoop2/YARN dependencies) suitable for download page."
classifier = "hadoop2"
//task miniHdfsStop(type: org.elasticsearch.gradle.LoggedExec) {
// onlyIf { hdfsPid > -1 }
// if (Os.isFamily(Os.FAMILY_WINDOWS)) {
// executable 'Taskkill'
// args '/PID', hdfsCluster.pid, '/F'
// } else {
// executable 'kill'
// args '-9', hdfsCluster.pid
// }
//}
into ("hadoop-libs") {
from configurations.hadoop2.allArtifacts.files
from configurations.hadoop2
}
}
task distZipNoHadoop(type: Zip, dependsOn: [hadoopLinkedJar, jar]) { zipTask ->
from (zipTree(bundlePlugin.archivePath)) {
exclude "hadoop-libs/**"
}
from sourceSets.main.output.resourcesDir
description = "Builds archive (without any Hadoop dependencies) suitable for download page."
classifier = "lite"
}
//integTest.dependsOn(miniHdfsStart)
//integTest.finalizedBy(miniHdfsStop)
thirdPartyAudit.enabled = false
artifacts {
archives bundlePlugin
'default' bundlePlugin
archives distZipHadoop1
archives distZipHadoop2
archives distZipNoHadoop
}
integTest {
cluster {
plugin(pluginProperties.extension.name, zipTree(distZipHadoop2.archivePath))
}
}
// classes are missing, e.g. org.mockito.Mockito
thirdPartyAudit.missingClasses = true
}

View File

@ -18,16 +18,6 @@
*/
package org.elasticsearch.plugin.hadoop.hdfs;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.Repository;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URI;
@ -41,13 +31,23 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.Repository;
//
// Note this plugin is somewhat special as Hadoop itself loads a number of libraries and thus requires a number of permissions to run even in client mode.
// This poses two problems:
// - Hadoop itself comes with tons of jars, many providing the same classes across packages. In particular Hadoop 2 provides package annotations in the same
// package across jars which trips JarHell. Thus, to allow Hadoop jars to load, the plugin uses a dedicated CL which picks them up from the hadoop-libs folder.
// - The issue though with using a different CL is that it picks up the jars from a different location / codeBase and thus it does not fall under the plugin
// permissions. In other words, the plugin permissions don't apply to the hadoop libraries.
// permissions. In other words, the plugin permissions don't apply to the hadoop libraries.
// There are different approaches here:
// - implement a custom classloader that loads the jars but 'lies' about the codesource. It is doable but since URLClassLoader is locked down, one would
// would have to implement the whole jar opening and loading from it. Not impossible but still fairly low-level.
@ -64,7 +64,7 @@ import java.util.Locale;
// - package plugin.hadoop.hdfs is part of the plugin
// - all the other packages are assumed to be in the nested Hadoop CL.
// Code
// Code
public class HdfsPlugin extends Plugin {
@Override
@ -81,7 +81,7 @@ public class HdfsPlugin extends Plugin {
public void onModule(RepositoriesModule repositoriesModule) {
String baseLib = Utils.detectLibFolder();
List<URL> cp = getHadoopClassLoaderPath(baseLib);
ClassLoader hadoopCL = URLClassLoader.newInstance(cp.toArray(new URL[cp.size()]), getClass().getClassLoader());
Class<? extends Repository> repository = null;
@ -170,4 +170,4 @@ public class HdfsPlugin extends Plugin {
}
}
}
}
}

View File

@ -1,5 +1,3 @@
package org.elasticsearch.plugin.hadoop.hdfs;
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
@ -18,8 +16,7 @@ package org.elasticsearch.plugin.hadoop.hdfs;
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.SpecialPermission;
package org.elasticsearch.plugin.hadoop.hdfs;
import java.net.URL;
import java.security.AccessControlContext;
@ -28,6 +25,8 @@ import java.security.DomainCombiner;
import java.security.PrivilegedAction;
import java.security.ProtectionDomain;
import org.elasticsearch.SpecialPermission;
public abstract class Utils {
protected static AccessControlContext hadoopACC() {
@ -100,4 +99,4 @@ public abstract class Utils {
return base;
}
}
}

View File

@ -19,11 +19,11 @@
package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext;
import java.io.IOException;
interface FsCallback<V> {
interface FcCallback<V> {
V doInHdfs(FileSystem fs) throws IOException;
V doInHdfs(FileContext fc) throws IOException;
}

View File

@ -18,11 +18,11 @@
*/
package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext;
import java.io.IOException;
interface FileSystemFactory {
interface FileContextFactory {
FileSystem getFileSystem() throws IOException;
FileContext getFileContext() throws IOException;
}

View File

@ -18,24 +18,27 @@
*/
package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.Syncable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
public class HdfsBlobContainer extends AbstractBlobContainer {
@ -52,10 +55,10 @@ public class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public boolean blobExists(String blobName) {
try {
return SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<Boolean>() {
return SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback<Boolean>() {
@Override
public Boolean doInHdfs(FileSystem fs) throws IOException {
return fs.exists(new Path(path, blobName));
public Boolean doInHdfs(FileContext fc) throws IOException {
return fc.util().exists(new Path(path, blobName));
}
});
} catch (Exception e) {
@ -65,46 +68,77 @@ public class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public void deleteBlob(String blobName) throws IOException {
SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<Boolean>() {
SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback<Boolean>() {
@Override
public Boolean doInHdfs(FileSystem fs) throws IOException {
return fs.delete(new Path(path, blobName), true);
public Boolean doInHdfs(FileContext fc) throws IOException {
return fc.delete(new Path(path, blobName), true);
}
});
}
@Override
public void move(String sourceBlobName, String targetBlobName) throws IOException {
boolean rename = SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<Boolean>() {
SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback<Void>() {
@Override
public Boolean doInHdfs(FileSystem fs) throws IOException {
return fs.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName));
public Void doInHdfs(FileContext fc) throws IOException {
// _try_ to hsync the file before appending
// since append is optional this is a best effort
Path source = new Path(path, sourceBlobName);
// try-with-resource is nice but since this is optional, it's hard to figure out
// what worked and what didn't.
// it's okay to not be able to append the file but not okay if hsync fails
// classic try / catch to the rescue
FSDataOutputStream stream = null;
try {
stream = fc.create(source, EnumSet.of(CreateFlag.APPEND, CreateFlag.SYNC_BLOCK), CreateOpts.donotCreateParent());
} catch (IOException ex) {
// append is optional, ignore
}
if (stream != null) {
try (OutputStream s = stream) {
if (s instanceof Syncable) {
((Syncable) s).hsync();
}
}
}
// finally rename
fc.rename(source, new Path(path, targetBlobName));
return null;
}
});
if (!rename) {
throw new IOException(String.format(Locale.ROOT, "can not move blob from [%s] to [%s]", sourceBlobName, targetBlobName));
}
}
@Override
public InputStream readBlob(String blobName) throws IOException {
// FSDataInputStream does buffering internally
return SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<InputStream>() {
return SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback<InputStream>() {
@Override
public InputStream doInHdfs(FileSystem fs) throws IOException {
return fs.open(new Path(path, blobName), blobStore.bufferSizeInBytes());
public InputStream doInHdfs(FileContext fc) throws IOException {
return fc.open(new Path(path, blobName), blobStore.bufferSizeInBytes());
}
});
}
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<Void>() {
SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback<Void>() {
@Override
public Void doInHdfs(FileSystem fs) throws IOException {
try (OutputStream stream = createOutput(blobName)) {
Streams.copy(inputStream, stream);
public Void doInHdfs(FileContext fc) throws IOException {
// don't use Streams to manually call hsync
// note that the inputstream is NOT closed here for two reasons:
// 1. it is closed already by ES after executing this method
// 0. closing the stream twice causes Hadoop to issue WARNING messages which are basically noise
// see https://issues.apache.org/jira/browse/HDFS-8099
try (FSDataOutputStream stream = createOutput(fc, blobName)) {
int bytesRead;
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
}
stream.hsync();
}
return null;
}
@ -113,34 +147,34 @@ public class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public void writeBlob(String blobName, BytesReference bytes) throws IOException {
SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<Void>() {
SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback<Void>() {
@Override
public Void doInHdfs(FileSystem fs) throws IOException {
try (OutputStream stream = createOutput(blobName)) {
public Void doInHdfs(FileContext fc) throws IOException {
try (FSDataOutputStream stream = createOutput(fc, blobName)) {
bytes.writeTo(stream);
stream.hsync();
}
return null;
}
});
}
private OutputStream createOutput(String blobName) throws IOException {
Path file = new Path(path, blobName);
// FSDataOutputStream does buffering internally
return blobStore.fileSystemFactory().getFileSystem().create(file, true, blobStore.bufferSizeInBytes());
private FSDataOutputStream createOutput(FileContext fc, String blobName) throws IOException {
return fc.create(new Path(path, blobName), EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK),
CreateOpts.bufferSize(blobStore.bufferSizeInBytes()), CreateOpts.createParent());
}
@Override
public Map<String, BlobMetaData> listBlobsByPrefix(final @Nullable String blobNamePrefix) throws IOException {
FileStatus[] files = SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<FileStatus[]>() {
FileStatus[] files = SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback<FileStatus[]>() {
@Override
public FileStatus[] doInHdfs(FileSystem fs) throws IOException {
return fs.listStatus(path, new PathFilter() {
public FileStatus[] doInHdfs(FileContext fc) throws IOException {
return (!fc.util().exists(path) ? null : fc.util().listStatus(path, new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith(blobNamePrefix);
}
});
}));
}
});
if (files == null || files.length == 0) {
@ -155,10 +189,10 @@ public class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public Map<String, BlobMetaData> listBlobs() throws IOException {
FileStatus[] files = SecurityUtils.execute(blobStore.fileSystemFactory(), new FsCallback<FileStatus[]>() {
FileStatus[] files = SecurityUtils.execute(blobStore.fileContextFactory(), new FcCallback<FileStatus[]>() {
@Override
public FileStatus[] doInHdfs(FileSystem fs) throws IOException {
return fs.listStatus(path);
public FileStatus[] doInHdfs(FileContext fc) throws IOException {
return (!fc.util().exists(path) ? null : fc.util().listStatus(path));
}
});
if (files == null || files.length == 0) {

View File

@ -18,7 +18,7 @@
*/
package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.blobstore.BlobContainer;
@ -35,14 +35,14 @@ import java.util.concurrent.Executor;
public class HdfsBlobStore extends AbstractComponent implements BlobStore {
private final FileSystemFactory ffs;
private final FileContextFactory fcf;
private final Path rootHdfsPath;
private final ThreadPool threadPool;
private final int bufferSizeInBytes;
public HdfsBlobStore(Settings settings, FileSystemFactory ffs, Path path, ThreadPool threadPool) throws IOException {
public HdfsBlobStore(Settings settings, FileContextFactory ffs, Path path, ThreadPool threadPool) throws IOException {
super(settings);
this.ffs = ffs;
this.fcf = ffs;
this.rootHdfsPath = path;
this.threadPool = threadPool;
@ -52,11 +52,11 @@ public class HdfsBlobStore extends AbstractComponent implements BlobStore {
}
private void mkdirs(Path path) throws IOException {
SecurityUtils.execute(ffs, new FsCallback<Void>() {
SecurityUtils.execute(fcf, new FcCallback<Void>() {
@Override
public Void doInHdfs(FileSystem fs) throws IOException {
if (!fs.exists(path)) {
fs.mkdirs(path);
public Void doInHdfs(FileContext fc) throws IOException {
if (fc.util().exists(path)) {
fc.mkdir(path, null, true);
}
return null;
}
@ -68,8 +68,8 @@ public class HdfsBlobStore extends AbstractComponent implements BlobStore {
return rootHdfsPath.toUri().toString();
}
public FileSystemFactory fileSystemFactory() {
return ffs;
public FileContextFactory fileContextFactory() {
return fcf;
}
public Path path() {
@ -91,10 +91,10 @@ public class HdfsBlobStore extends AbstractComponent implements BlobStore {
@Override
public void delete(BlobPath path) throws IOException {
SecurityUtils.execute(ffs, new FsCallback<Void>() {
SecurityUtils.execute(fcf, new FcCallback<Void>() {
@Override
public Void doInHdfs(FileSystem fs) throws IOException {
fs.delete(translateToHdfsPath(path), true);
public Void doInHdfs(FileContext fc) throws IOException {
fc.delete(translateToHdfsPath(path), true);
return null;
}
});

View File

@ -19,9 +19,9 @@
package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchGenerationException;
@ -51,7 +51,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
public class HdfsRepository extends BlobStoreRepository implements FileSystemFactory {
public class HdfsRepository extends BlobStoreRepository implements FileContextFactory {
public final static String TYPE = "hdfs";
@ -60,7 +60,7 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac
private final ByteSizeValue chunkSize;
private final boolean compress;
private final RepositorySettings repositorySettings;
private FileSystem fs;
private FileContext fc;
@Inject
public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, ThreadPool threadPool) throws IOException {
@ -74,16 +74,16 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac
}
// get configuration
fs = getFileSystem();
Path hdfsPath = SecurityUtils.execute(fs, new FsCallback<Path>() {
fc = getFileContext();
Path hdfsPath = SecurityUtils.execute(fc, new FcCallback<Path>() {
@Override
public Path doInHdfs(FileSystem fs) throws IOException {
return fs.makeQualified(new Path(path));
public Path doInHdfs(FileContext fc) throws IOException {
return fc.makeQualified(new Path(path));
}
});
this.basePath = BlobPath.cleanPath();
logger.debug("Using file-system [{}] for URI [{}], path [{}]", fs, fs.getUri(), hdfsPath);
logger.debug("Using file-system [{}] for URI [{}], path [{}]", fc.getDefaultFileSystem(), fc.getDefaultFileSystem().getUri(), hdfsPath);
blobStore = new HdfsBlobStore(settings, this, hdfsPath, threadPool);
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize("chunk_size", null));
this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean("compress", false));
@ -91,7 +91,7 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac
// as the FileSystem is long-lived and might go away, make sure to check it before it's being used.
@Override
public FileSystem getFileSystem() throws IOException {
public FileContext getFileContext() throws IOException {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// unprivileged code such as scripts do not have SpecialPermission
@ -99,10 +99,10 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac
}
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<FileSystem>() {
return AccessController.doPrivileged(new PrivilegedExceptionAction<FileContext>() {
@Override
public FileSystem run() throws IOException {
return doGetFileSystem();
public FileContext run() throws IOException {
return doGetFileContext();
}
}, SecurityUtils.AccBridge.acc());
} catch (PrivilegedActionException pae) {
@ -120,37 +120,37 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac
}
}
private FileSystem doGetFileSystem() throws IOException {
private FileContext doGetFileContext() throws IOException {
// check if the fs is still alive
// make a cheap call that triggers little to no security checks
if (fs != null) {
if (fc != null) {
try {
fs.isFile(fs.getWorkingDirectory());
fc.util().exists(fc.getWorkingDirectory());
} catch (IOException ex) {
if (ex.getMessage().contains("Filesystem closed")) {
fs = null;
fc = null;
}
else {
throw ex;
}
}
}
if (fs == null) {
if (fc == null) {
Thread th = Thread.currentThread();
ClassLoader oldCL = th.getContextClassLoader();
try {
th.setContextClassLoader(getClass().getClassLoader());
return initFileSystem(repositorySettings);
return initFileContext(repositorySettings);
} catch (IOException ex) {
throw ex;
} finally {
th.setContextClassLoader(oldCL);
}
}
return fs;
return fc;
}
private FileSystem initFileSystem(RepositorySettings repositorySettings) throws IOException {
private FileContext initFileContext(RepositorySettings repositorySettings) throws IOException {
Configuration cfg = new Configuration(repositorySettings.settings().getAsBoolean("load_defaults", settings.getAsBoolean("load_defaults", true)));
cfg.setClassLoader(this.getClass().getClassLoader());
@ -175,15 +175,16 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac
}
String uri = repositorySettings.settings().get("uri", settings.get("uri"));
URI actualUri = (uri != null ? URI.create(uri) : FileSystem.getDefaultUri(cfg));
String user = repositorySettings.settings().get("user", settings.get("user"));
URI actualUri = (uri != null ? URI.create(uri) : null);
try {
// disable FS cache
String disableFsCache = String.format(Locale.ROOT, "fs.%s.impl.disable.cache", actualUri.getScheme());
cfg.setBoolean(disableFsCache, true);
return (user != null ? FileSystem.get(actualUri, cfg, user) : FileSystem.get(actualUri, cfg));
// create the AFS manually since through FileContext is relies on Subject.doAs for no reason at all
AbstractFileSystem fs = AbstractFileSystem.get(actualUri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (Exception ex) {
throw new ElasticsearchGenerationException(String.format(Locale.ROOT, "Cannot create Hdfs file-system for uri [%s]", actualUri), ex);
}
@ -253,7 +254,8 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac
protected void doClose() throws ElasticsearchException {
super.doClose();
IOUtils.closeStream(fs);
fs = null;
// TODO: FileContext does not support any close - is there really no way
// to handle it?
fc = null;
}
}
}

View File

@ -19,7 +19,7 @@
package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.plugin.hadoop.hdfs.Utils;
@ -38,11 +38,11 @@ class SecurityUtils {
}
}
static <V> V execute(FileSystemFactory ffs, FsCallback<V> callback) throws IOException {
return execute(ffs.getFileSystem(), callback);
static <V> V execute(FileContextFactory fcf, FcCallback<V> callback) throws IOException {
return execute(fcf.getFileContext(), callback);
}
static <V> V execute(FileSystem fs, FsCallback<V> callback) throws IOException {
static <V> V execute(FileContext fc, FcCallback<V> callback) throws IOException {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// unprivileged code such as scripts do not have SpecialPermission
@ -53,7 +53,7 @@ class SecurityUtils {
return AccessController.doPrivileged(new PrivilegedExceptionAction<V>() {
@Override
public V run() throws IOException {
return callback.doInHdfs(fs);
return callback.doInHdfs(fc);
}
}, AccBridge.acc());
} catch (PrivilegedActionException pae) {

View File

@ -28,40 +28,34 @@ grant {
permission java.lang.RuntimePermission "setContextClassLoader";
//
// Hadoop 1
// Hadoop 2
//
// UserGroupInformation (UGI)
// UserGroupInformation (UGI) Metrics
permission java.lang.RuntimePermission "accessDeclaredMembers";
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
// Shell initialization - reading system props
permission java.util.PropertyPermission "*", "read,write";
// UGI triggers JAAS
permission javax.security.auth.AuthPermission "getSubject";
// JAAS libraries are not loaded with the proper context in Hadoop, hence why the permission is needed here
permission java.lang.RuntimePermission "loadLibrary.jaas_nt";
// which triggers the use of the Kerberos library
permission java.lang.RuntimePermission "accessClassInPackage.sun.security.krb5";
// plus LoginContext
permission javax.security.auth.AuthPermission "modifyPrincipals";
permission javax.security.auth.AuthPermission "modifyPublicCredentials";
permission javax.security.auth.AuthPermission "modifyPrivateCredentials";
//
// Hadoop 2
//
// UGI (Ugi Metrics)
permission java.lang.RuntimePermission "accessDeclaredMembers";
// Shell initialization - reading system props
permission java.util.PropertyPermission "*", "read,write";
//permission javax.security.auth.PrivateCredentialPermission "org.apache.hadoop.security.Credentials \"*\"", "read";
permission javax.security.auth.PrivateCredentialPermission "org.apache.hadoop.security.Credentials \"*\"", "read";
permission javax.security.auth.AuthPermission "doAs";
// HftpFileSystem (all present FS are loaded and initialized at startup ...)
permission java.lang.RuntimePermission "setFactory";
};
// DFSClient init (metrics again)
permission java.lang.RuntimePermission "shutdownHooks";
};

View File

@ -1,5 +1,3 @@
package org.elasticsearch.plugin.hadoop.hdfs;
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
@ -18,17 +16,20 @@ package org.elasticsearch.plugin.hadoop.hdfs;
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.hadoop.hdfs;
import java.io.IOException;
import java.util.Collection;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.RestTestCandidate;
import org.elasticsearch.test.rest.parser.RestTestParseException;
import java.io.IOException;
import java.util.Collection;
public class HdfsRepositoryRestIT extends ESRestTestCase {
@Override

View File

@ -1,5 +1,3 @@
package org.elasticsearch.plugin.hadoop.hdfs;
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
@ -18,6 +16,7 @@ package org.elasticsearch.plugin.hadoop.hdfs;
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.hadoop.hdfs;
import java.net.URL;
import java.util.Collections;

View File

@ -32,7 +32,6 @@ import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.ESIntegTestCase.ThirdParty;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.junit.After;
import org.junit.Before;
@ -45,7 +44,11 @@ import static org.hamcrest.Matchers.greaterThan;
/**
* You must specify {@code -Dtests.thirdparty=true}
*/
@ThirdParty
// Make sure to start MiniHDFS cluster before
// otherwise, one will get some wierd PrivateCredentialPermission exception
// caused by the HDFS fallback code (which doesn't do much anyway)
// @ThirdParty
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, transportClientRatio = 0.0)
public class HdfsTests extends ESIntegTestCase {
@ -75,10 +78,12 @@ public class HdfsTests extends ESIntegTestCase {
}
private String path;
private int port;
@Before
public final void wipeBefore() throws Exception {
wipeRepositories();
port = MiniHDFS.getPort();
path = "build/data/repo-" + randomInt();
}
@ -94,9 +99,9 @@ public class HdfsTests extends ESIntegTestCase {
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("hdfs")
.setSettings(Settings.settingsBuilder()
//.put("uri", "hdfs://127.0.0.1:51227")
.put("uri", "hdfs://127.0.0.1:" + port)
.put("conf.fs.es-hdfs.impl", TestingFs.class.getName())
.put("uri", "es-hdfs://./build/")
// .put("uri", "es-hdfs:///")
.put("path", path)
.put("conf", "additional-cfg.xml, conf-2.xml")
.put("chunk_size", randomIntBetween(100, 1000) + "k")
@ -178,9 +183,9 @@ public class HdfsTests extends ESIntegTestCase {
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("hdfs")
.setSettings(Settings.settingsBuilder()
// .put("uri", "hdfs://127.0.0.1:51227/")
.put("conf.fs.es-hdfs.impl", TestingFs.class.getName())
.put("uri", "es-hdfs:///")
.put("uri", "hdfs://127.0.0.1:" + port)
// .put("uri", "es-hdfs:///")
.put("conf.fs.es-hdfs.impl", TestingFs.class.getName())
.put("path", path + "a@b$c#11:22")
.put("chunk_size", randomIntBetween(100, 1000) + "k")
.put("compress", randomBoolean()))
@ -215,4 +220,4 @@ public class HdfsTests extends ESIntegTestCase {
private long count(Client client, String index) {
return client.prepareSearch(index).setSize(0).get().getHits().totalHits();
}
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.hadoop.hdfs;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.monitor.jvm.JvmInfo;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Locale;
public class MiniHDFS {
private static volatile MiniDFSCluster dfs;
private static String PORT_FILE_NAME = "minihdfs.port";
private static String PID_FILE_NAME = "minihdfs.pid";
public static void main(String[] args) throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
MiniHDFS.stop();
}
});
start();
}
public static int start() throws IOException {
if (dfs != null) {
return -1;
}
Path basePath = getBasePath();
Path portPath = basePath.resolve(PORT_FILE_NAME);
Path pidPath = basePath.resolve(PID_FILE_NAME);
if (Files.exists(basePath)) {
RandomizedTest.rmDir(basePath);
}
Configuration cfg = new Configuration();
cfg.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, getBasePath().toAbsolutePath().toString());
// lower default permission
cfg.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY, "766");
dfs = new MiniDFSCluster.Builder(cfg).build();
int port = dfs.getNameNodePort();
// write port
Files.write(portPath, Integer.toString(port).getBytes(StandardCharsets.UTF_8));
// write pid
Files.write(pidPath, Long.toString(JvmInfo.jvmInfo().getPid()).getBytes(StandardCharsets.UTF_8));
System.out.printf(Locale.ROOT, "Started HDFS at %s\n", dfs.getURI());
System.out.printf(Locale.ROOT, "Port information available at %s\n", portPath.toRealPath());
System.out.printf(Locale.ROOT, "PID information available at %s\n", pidPath.toRealPath());
return port;
}
private static Path getBasePath() {
Path tmpFolder = PathUtils.get(System.getProperty("java.io.tmpdir"));
// "test.build.data"
String baseFolder = System.getProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "es-test/build/test/data");
return tmpFolder.resolve(baseFolder);
}
public static int getPort() throws IOException {
Path portPath = getBasePath().resolve(PORT_FILE_NAME);
if (Files.exists(portPath)) {
return Integer.parseInt(new String(Files.readAllBytes(portPath), StandardCharsets.UTF_8));
}
throw new IllegalStateException(String.format(Locale.ROOT, "Cannot find Mini DFS port file at %s ; was '%s' started?", portPath.toAbsolutePath(), MiniHDFS.class));
}
public static long getPid() throws Exception {
Path pidPath = getBasePath().resolve(PID_FILE_NAME);
if (Files.exists(pidPath)) {
return Long.parseLong(new String(Files.readAllBytes(pidPath), StandardCharsets.UTF_8));
}
throw new IllegalStateException(String.format(Locale.ROOT, "Cannot find Mini DFS pid file at %s ; was '%s' started?", pidPath.toAbsolutePath(), MiniHDFS.class));
}
public static void stop() {
if (dfs != null) {
dfs.shutdown(true);
dfs = null;
}
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.elasticsearch.common.SuppressForbidden;
import java.io.File;
public class MiniHDFSCluster {
@SuppressForbidden(reason = "Hadoop is messy")
public static void main(String[] args) throws Exception {
FileUtil.fullyDelete(new File(System.getProperty("test.build.data", "build/test/data"), "dfs/"));
// MiniHadoopClusterManager.main(new String[] { "-nomr" });
Configuration cfg = new Configuration();
cfg.set(DataNode.DATA_DIR_PERMISSION_KEY, "666");
cfg.set("dfs.replication", "0");
MiniDFSCluster dfsCluster = new MiniDFSCluster(cfg, 1, true, null);
FileSystem fs = dfsCluster.getFileSystem();
System.out.println(fs.getClass());
System.out.println(fs.getUri());
System.out.println(dfsCluster.getHftpFileSystem().getClass());
// dfsCluster.shutdown();
}
}

View File

@ -1,5 +1,3 @@
package org.elasticsearch.plugin.hadoop.hdfs;
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
@ -18,6 +16,7 @@ package org.elasticsearch.plugin.hadoop.hdfs;
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.hadoop.hdfs;
import org.elasticsearch.test.ESTestCase;

View File

@ -0,0 +1,25 @@
# Integration tests for HDFS Repository plugin
#
# Check plugin is installed
#
"HDFS Repository Config":
- do:
snapshot.create_repository:
repository: test_repo_hdfs_1
verify: false
body:
type: hdfs
settings:
# local HDFS implementation
conf.fs.es-hdfs.impl: "org.elasticsearch.repositories.hdfs.TestingFs"
uri: "es-hdfs://./build/"
path: "build/data/repo-hdfs"
# Get repositry
- do:
snapshot.get_repository:
repository: test_repo_hdfs_1
- is_true: test_repo_hdfs_1
- is_true: test_repo_hdfs_1.settings.uri
- match: {test_repo_hdfs_1.settings.path : "build/data/repo-hdfs"}