diff --git a/docs/plugins/repository-hdfs.asciidoc b/docs/plugins/repository-hdfs.asciidoc index 53052604514..28abaf78f66 100644 --- a/docs/plugins/repository-hdfs.asciidoc +++ b/docs/plugins/repository-hdfs.asciidoc @@ -37,8 +37,7 @@ The node must be stopped before removing the plugin. The HDFS snapshot/restore plugin is built against the latest Apache Hadoop 2.x (currently 2.7.1). If the distro you are using is not protocol compatible with Apache Hadoop, consider replacing the Hadoop libraries inside the plugin folder with your own (you might have to adjust the security permissions required). -Even if Hadoop is already installed on the Elasticsearch nodes, for security reasons, the required libraries need to be placed under the plugin folder. -Note that in most cases, if the distro is compatible, one simply needs to configure the repository with the appropriate Hadoop configuration files (see below). +Even if Hadoop is already installed on the Elasticsearch nodes, for security reasons, the required libraries need to be placed under the plugin folder. Note that in most cases, if the distro is compatible, one simply needs to configure the repository with the appropriate Hadoop configuration files (see below). Windows Users:: Using Apache Hadoop on Windows is problematic and thus it is not recommended. For those _really_ wanting to use it, make sure you place the elusive `winutils.exe` under the @@ -54,8 +53,8 @@ Once installed, define the configuration for the `hdfs` repository through `elas ---- repositories hdfs: - uri: "hdfs://:/" \# optional - Hadoop file-system URI - path: "some/path" \# required - path with the file-system where data is stored/loaded + uri: "hdfs://:/" \# required - HDFS address only + path: "some/path" \# required - path within the file-system where data is stored/loaded load_defaults: "true" \# optional - whether to load the default Hadoop configuration (default) or not conf_location: "extra-cfg.xml" \# optional - Hadoop configuration XML to be loaded (use commas for multi values) conf. : "" \# optional - 'inlined' key=value added to the Hadoop configuration @@ -64,6 +63,3 @@ repositories chunk_size: "10mb" \# optional - chunk size (disabled by default) ---- - -NOTE: Be careful when including a paths within the `uri` setting; Some implementations ignore them completely while -others consider them. In general, we recommend keeping the `uri` to a minimum and using the `path` element instead. \ No newline at end of file diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsPlugin.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsPlugin.java index ba6d840c181..a14ed793e1e 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsPlugin.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsPlugin.java @@ -62,7 +62,6 @@ public class HdfsPlugin extends Plugin { return "HDFS Repository Plugin"; } - @SuppressWarnings("unchecked") public void onModule(RepositoriesModule repositoriesModule) { repositoriesModule.registerRepository("hdfs", HdfsRepository.class, BlobStoreIndexShardRepository.class); } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java index 9c6dac7b68a..815c0d1eff3 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java @@ -40,9 +40,9 @@ public class HdfsBlobStore extends AbstractComponent implements BlobStore { private final ThreadPool threadPool; private final int bufferSizeInBytes; - public HdfsBlobStore(Settings settings, FileContextFactory ffs, Path path, ThreadPool threadPool) throws IOException { + public HdfsBlobStore(Settings settings, FileContextFactory fcf, Path path, ThreadPool threadPool) throws IOException { super(settings); - this.fcf = ffs; + this.fcf = fcf; this.rootHdfsPath = path; this.threadPool = threadPool; diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index b00c72bad8d..75aac89b987 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -18,27 +18,6 @@ */ package org.elasticsearch.repositories.hdfs; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.AbstractFileSystem; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchGenerationException; -import org.elasticsearch.SpecialPermission; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.PathUtils; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.snapshots.IndexShardRepository; -import org.elasticsearch.repositories.RepositoryName; -import org.elasticsearch.repositories.RepositorySettings; -import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; - import java.io.IOException; import java.net.MalformedURLException; import java.net.URI; @@ -51,6 +30,26 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.AbstractFileSystem; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchGenerationException; +import org.elasticsearch.SpecialPermission; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.snapshots.IndexShardRepository; +import org.elasticsearch.repositories.RepositoryName; +import org.elasticsearch.repositories.RepositorySettings; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; + public class HdfsRepository extends BlobStoreRepository implements FileContextFactory { public final static String TYPE = "hdfs"; @@ -61,6 +60,7 @@ public class HdfsRepository extends BlobStoreRepository implements FileContextFa private final RepositorySettings repositorySettings; private final ThreadPool threadPool; private final String path; + private final String uri; private FileContext fc; private HdfsBlobStore blobStore; @@ -71,6 +71,7 @@ public class HdfsRepository extends BlobStoreRepository implements FileContextFa this.repositorySettings = repositorySettings; this.threadPool = threadPool; + uri = repositorySettings.settings().get("uri", settings.get("uri")); path = repositorySettings.settings().get("path", settings.get("path")); @@ -81,9 +82,25 @@ public class HdfsRepository extends BlobStoreRepository implements FileContextFa @Override protected void doStart() { + if (!Strings.hasText(uri)) { + throw new IllegalArgumentException("No 'uri' defined for hdfs snapshot/restore"); + } + + URI actualUri = URI.create(uri); + String scheme = actualUri.getScheme(); + if (!Strings.hasText(scheme) || !scheme.toLowerCase(Locale.ROOT).equals("hdfs")) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Invalid scheme [%s] specified in uri [%s]; only 'hdfs' uri allowed for hdfs snapshot/restore", scheme, uri)); + } + String p = actualUri.getPath(); + if (Strings.hasText(p) && !p.equals("/")) { + throw new IllegalArgumentException(String.format(Locale.ROOT, + "Use 'path' option to specify a path [%s], not the uri [%s] for hdfs snapshot/restore", p, uri)); + } + // get configuration if (path == null) { - throw new IllegalArgumentException("no 'path' defined for hdfs snapshot/restore"); + throw new IllegalArgumentException("No 'path' defined for hdfs snapshot/restore"); } try { fc = getFileContext(); @@ -186,13 +203,10 @@ public class HdfsRepository extends BlobStoreRepository implements FileContextFa throw new ElasticsearchGenerationException(String.format(Locale.ROOT, "Cannot initialize Hadoop"), th); } - String uri = repositorySettings.settings().get("uri", settings.get("uri")); - URI actualUri = (uri != null ? URI.create(uri) : null); - + URI actualUri = URI.create(uri); try { // disable FS cache - String disableFsCache = String.format(Locale.ROOT, "fs.%s.impl.disable.cache", actualUri.getScheme()); - cfg.setBoolean(disableFsCache, true); + cfg.setBoolean("fs.hdfs.impl.disable.cache", true); // create the AFS manually since through FileContext is relies on Subject.doAs for no reason at all AbstractFileSystem fs = AbstractFileSystem.get(actualUri, cfg); @@ -202,7 +216,6 @@ public class HdfsRepository extends BlobStoreRepository implements FileContextFa } } - @SuppressForbidden(reason = "pick up Hadoop config (which can be on HDFS)") private void addConfigLocation(Configuration cfg, String confLocation) { URL cfgURL = null; // it's an URL diff --git a/plugins/repository-hdfs/src/main/resources/hadoop-libs/README.asciidoc b/plugins/repository-hdfs/src/main/resources/hadoop-libs/README.asciidoc deleted file mode 100644 index e9f85f3cdf7..00000000000 --- a/plugins/repository-hdfs/src/main/resources/hadoop-libs/README.asciidoc +++ /dev/null @@ -1 +0,0 @@ -Folder containing the required Hadoop client libraries and dependencies. \ No newline at end of file diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTests.java index 9728c58203d..d7c1a37c03d 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/plugin/hadoop/hdfs/HdfsTests.java @@ -18,6 +18,11 @@ */ package org.elasticsearch.plugin.hadoop.hdfs; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +import java.util.Collection; + import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; @@ -26,17 +31,11 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoryException; -import org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; -import java.util.Collection; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; - @ClusterScope(scope = Scope.SUITE, numDataNodes = 1, transportClientRatio = 0.0) public class HdfsTests extends ESIntegTestCase { @@ -51,8 +50,8 @@ public class HdfsTests extends ESIntegTestCase { PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") .setType("hdfs") .setSettings(Settings.settingsBuilder() - .put("uri", "file:///") - .put("conf.fs.AbstractFileSystem.file.impl", TestingFs.class.getName()) + .put("uri", "hdfs:///") + .put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName()) .put("path", "foo") .put("conf", "additional-cfg.xml, conf-2.xml") .put("chunk_size", randomIntBetween(100, 1000) + "k") @@ -121,7 +120,6 @@ public class HdfsTests extends ESIntegTestCase { assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); } - // RepositoryVerificationException.class public void testWrongPath() { Client client = client(); @@ -129,8 +127,8 @@ public class HdfsTests extends ESIntegTestCase { PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") .setType("hdfs") .setSettings(Settings.settingsBuilder() - .put("uri", "file:///") - .put("conf.fs.AbstractFileSystem.file.impl", TestingFs.class.getName()) + .put("uri", "hdfs:///") + .put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName()) .put("path", "a@b$c#11:22") .put("chunk_size", randomIntBetween(100, 1000) + "k") .put("compress", randomBoolean())) @@ -144,8 +142,52 @@ public class HdfsTests extends ESIntegTestCase { // expected } } + + public void testNonHdfsUri() { + Client client = client(); + try { + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType("hdfs") + .setSettings(Settings.settingsBuilder() + .put("uri", "file:///") + .put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName()) + .put("path", "should-fail") + .put("chunk_size", randomIntBetween(100, 1000) + "k") + .put("compress", randomBoolean())) + .get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); + ensureGreen(); + fail("Path name is invalid"); + } catch (RepositoryException re) { + // expected + } + } + + public void testPathSpecifiedInHdfs() { + Client client = client(); + try { + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + .setType("hdfs") + .setSettings(Settings.settingsBuilder() + .put("uri", "hdfs:///some/path") + .put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName()) + .put("path", "should-fail") + .put("chunk_size", randomIntBetween(100, 1000) + "k") + .put("compress", randomBoolean())) + .get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); + ensureGreen(); + fail("Path name is invalid"); + } catch (RepositoryException re) { + // expected + } + } private long count(Client client, String index) { return client.prepareSearch(index).setSize(0).get().getHits().totalHits(); } -} +} \ No newline at end of file