Merge branch 'hdfs2-only' of github.com:costin/elasticsearch into hdfs2-only
This commit is contained in:
commit
7ac49bb278
|
@ -37,8 +37,7 @@ The node must be stopped before removing the plugin.
|
|||
The HDFS snapshot/restore plugin is built against the latest Apache Hadoop 2.x (currently 2.7.1). If the distro you are using is not protocol
|
||||
compatible with Apache Hadoop, consider replacing the Hadoop libraries inside the plugin folder with your own (you might have to adjust the security permissions required).
|
||||
|
||||
Even if Hadoop is already installed on the Elasticsearch nodes, for security reasons, the required libraries need to be placed under the plugin folder.
|
||||
Note that in most cases, if the distro is compatible, one simply needs to configure the repository with the appropriate Hadoop configuration files (see below).
|
||||
Even if Hadoop is already installed on the Elasticsearch nodes, for security reasons, the required libraries need to be placed under the plugin folder. Note that in most cases, if the distro is compatible, one simply needs to configure the repository with the appropriate Hadoop configuration files (see below).
|
||||
|
||||
Windows Users::
|
||||
Using Apache Hadoop on Windows is problematic and thus it is not recommended. For those _really_ wanting to use it, make sure you place the elusive `winutils.exe` under the
|
||||
|
@ -54,8 +53,8 @@ Once installed, define the configuration for the `hdfs` repository through `elas
|
|||
----
|
||||
repositories
|
||||
hdfs:
|
||||
uri: "hdfs://<host>:<port>/" \# optional - Hadoop file-system URI
|
||||
path: "some/path" \# required - path with the file-system where data is stored/loaded
|
||||
uri: "hdfs://<host>:<port>/" \# required - HDFS address only
|
||||
path: "some/path" \# required - path within the file-system where data is stored/loaded
|
||||
load_defaults: "true" \# optional - whether to load the default Hadoop configuration (default) or not
|
||||
conf_location: "extra-cfg.xml" \# optional - Hadoop configuration XML to be loaded (use commas for multi values)
|
||||
conf.<key> : "<value>" \# optional - 'inlined' key=value added to the Hadoop configuration
|
||||
|
@ -64,6 +63,3 @@ repositories
|
|||
chunk_size: "10mb" \# optional - chunk size (disabled by default)
|
||||
|
||||
----
|
||||
|
||||
NOTE: Be careful when including a paths within the `uri` setting; Some implementations ignore them completely while
|
||||
others consider them. In general, we recommend keeping the `uri` to a minimum and using the `path` element instead.
|
|
@ -62,7 +62,6 @@ public class HdfsPlugin extends Plugin {
|
|||
return "HDFS Repository Plugin";
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void onModule(RepositoriesModule repositoriesModule) {
|
||||
repositoriesModule.registerRepository("hdfs", HdfsRepository.class, BlobStoreIndexShardRepository.class);
|
||||
}
|
||||
|
|
|
@ -40,9 +40,9 @@ public class HdfsBlobStore extends AbstractComponent implements BlobStore {
|
|||
private final ThreadPool threadPool;
|
||||
private final int bufferSizeInBytes;
|
||||
|
||||
public HdfsBlobStore(Settings settings, FileContextFactory ffs, Path path, ThreadPool threadPool) throws IOException {
|
||||
public HdfsBlobStore(Settings settings, FileContextFactory fcf, Path path, ThreadPool threadPool) throws IOException {
|
||||
super(settings);
|
||||
this.fcf = ffs;
|
||||
this.fcf = fcf;
|
||||
this.rootHdfsPath = path;
|
||||
this.threadPool = threadPool;
|
||||
|
||||
|
|
|
@ -18,27 +18,6 @@
|
|||
*/
|
||||
package org.elasticsearch.repositories.hdfs;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.AbstractFileSystem;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchGenerationException;
|
||||
import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.PathUtils;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.snapshots.IndexShardRepository;
|
||||
import org.elasticsearch.repositories.RepositoryName;
|
||||
import org.elasticsearch.repositories.RepositorySettings;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
|
@ -51,6 +30,26 @@ import java.util.Locale;
|
|||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.AbstractFileSystem;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchGenerationException;
|
||||
import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.PathUtils;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.snapshots.IndexShardRepository;
|
||||
import org.elasticsearch.repositories.RepositoryName;
|
||||
import org.elasticsearch.repositories.RepositorySettings;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
public class HdfsRepository extends BlobStoreRepository implements FileContextFactory {
|
||||
|
||||
public final static String TYPE = "hdfs";
|
||||
|
@ -61,6 +60,7 @@ public class HdfsRepository extends BlobStoreRepository implements FileContextFa
|
|||
private final RepositorySettings repositorySettings;
|
||||
private final ThreadPool threadPool;
|
||||
private final String path;
|
||||
private final String uri;
|
||||
private FileContext fc;
|
||||
private HdfsBlobStore blobStore;
|
||||
|
||||
|
@ -71,6 +71,7 @@ public class HdfsRepository extends BlobStoreRepository implements FileContextFa
|
|||
this.repositorySettings = repositorySettings;
|
||||
this.threadPool = threadPool;
|
||||
|
||||
uri = repositorySettings.settings().get("uri", settings.get("uri"));
|
||||
path = repositorySettings.settings().get("path", settings.get("path"));
|
||||
|
||||
|
||||
|
@ -81,9 +82,25 @@ public class HdfsRepository extends BlobStoreRepository implements FileContextFa
|
|||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
if (!Strings.hasText(uri)) {
|
||||
throw new IllegalArgumentException("No 'uri' defined for hdfs snapshot/restore");
|
||||
}
|
||||
|
||||
URI actualUri = URI.create(uri);
|
||||
String scheme = actualUri.getScheme();
|
||||
if (!Strings.hasText(scheme) || !scheme.toLowerCase(Locale.ROOT).equals("hdfs")) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format(Locale.ROOT, "Invalid scheme [%s] specified in uri [%s]; only 'hdfs' uri allowed for hdfs snapshot/restore", scheme, uri));
|
||||
}
|
||||
String p = actualUri.getPath();
|
||||
if (Strings.hasText(p) && !p.equals("/")) {
|
||||
throw new IllegalArgumentException(String.format(Locale.ROOT,
|
||||
"Use 'path' option to specify a path [%s], not the uri [%s] for hdfs snapshot/restore", p, uri));
|
||||
}
|
||||
|
||||
// get configuration
|
||||
if (path == null) {
|
||||
throw new IllegalArgumentException("no 'path' defined for hdfs snapshot/restore");
|
||||
throw new IllegalArgumentException("No 'path' defined for hdfs snapshot/restore");
|
||||
}
|
||||
try {
|
||||
fc = getFileContext();
|
||||
|
@ -186,13 +203,10 @@ public class HdfsRepository extends BlobStoreRepository implements FileContextFa
|
|||
throw new ElasticsearchGenerationException(String.format(Locale.ROOT, "Cannot initialize Hadoop"), th);
|
||||
}
|
||||
|
||||
String uri = repositorySettings.settings().get("uri", settings.get("uri"));
|
||||
URI actualUri = (uri != null ? URI.create(uri) : null);
|
||||
|
||||
URI actualUri = URI.create(uri);
|
||||
try {
|
||||
// disable FS cache
|
||||
String disableFsCache = String.format(Locale.ROOT, "fs.%s.impl.disable.cache", actualUri.getScheme());
|
||||
cfg.setBoolean(disableFsCache, true);
|
||||
cfg.setBoolean("fs.hdfs.impl.disable.cache", true);
|
||||
|
||||
// create the AFS manually since through FileContext is relies on Subject.doAs for no reason at all
|
||||
AbstractFileSystem fs = AbstractFileSystem.get(actualUri, cfg);
|
||||
|
@ -202,7 +216,6 @@ public class HdfsRepository extends BlobStoreRepository implements FileContextFa
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "pick up Hadoop config (which can be on HDFS)")
|
||||
private void addConfigLocation(Configuration cfg, String confLocation) {
|
||||
URL cfgURL = null;
|
||||
// it's an URL
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
Folder containing the required Hadoop client libraries and dependencies.
|
|
@ -18,6 +18,11 @@
|
|||
*/
|
||||
package org.elasticsearch.plugin.hadoop.hdfs;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||
|
@ -26,17 +31,11 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.RepositoryException;
|
||||
import org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin;
|
||||
import org.elasticsearch.snapshots.SnapshotState;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, transportClientRatio = 0.0)
|
||||
public class HdfsTests extends ESIntegTestCase {
|
||||
|
||||
|
@ -51,8 +50,8 @@ public class HdfsTests extends ESIntegTestCase {
|
|||
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("hdfs")
|
||||
.setSettings(Settings.settingsBuilder()
|
||||
.put("uri", "file:///")
|
||||
.put("conf.fs.AbstractFileSystem.file.impl", TestingFs.class.getName())
|
||||
.put("uri", "hdfs:///")
|
||||
.put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName())
|
||||
.put("path", "foo")
|
||||
.put("conf", "additional-cfg.xml, conf-2.xml")
|
||||
.put("chunk_size", randomIntBetween(100, 1000) + "k")
|
||||
|
@ -121,7 +120,6 @@ public class HdfsTests extends ESIntegTestCase {
|
|||
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
|
||||
}
|
||||
|
||||
// RepositoryVerificationException.class
|
||||
public void testWrongPath() {
|
||||
Client client = client();
|
||||
|
||||
|
@ -129,8 +127,8 @@ public class HdfsTests extends ESIntegTestCase {
|
|||
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("hdfs")
|
||||
.setSettings(Settings.settingsBuilder()
|
||||
.put("uri", "file:///")
|
||||
.put("conf.fs.AbstractFileSystem.file.impl", TestingFs.class.getName())
|
||||
.put("uri", "hdfs:///")
|
||||
.put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName())
|
||||
.put("path", "a@b$c#11:22")
|
||||
.put("chunk_size", randomIntBetween(100, 1000) + "k")
|
||||
.put("compress", randomBoolean()))
|
||||
|
@ -144,8 +142,52 @@ public class HdfsTests extends ESIntegTestCase {
|
|||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
public void testNonHdfsUri() {
|
||||
Client client = client();
|
||||
try {
|
||||
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("hdfs")
|
||||
.setSettings(Settings.settingsBuilder()
|
||||
.put("uri", "file:///")
|
||||
.put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName())
|
||||
.put("path", "should-fail")
|
||||
.put("chunk_size", randomIntBetween(100, 1000) + "k")
|
||||
.put("compress", randomBoolean()))
|
||||
.get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
|
||||
ensureGreen();
|
||||
fail("Path name is invalid");
|
||||
} catch (RepositoryException re) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
public void testPathSpecifiedInHdfs() {
|
||||
Client client = client();
|
||||
try {
|
||||
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("hdfs")
|
||||
.setSettings(Settings.settingsBuilder()
|
||||
.put("uri", "hdfs:///some/path")
|
||||
.put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName())
|
||||
.put("path", "should-fail")
|
||||
.put("chunk_size", randomIntBetween(100, 1000) + "k")
|
||||
.put("compress", randomBoolean()))
|
||||
.get();
|
||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||
|
||||
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
|
||||
ensureGreen();
|
||||
fail("Path name is invalid");
|
||||
} catch (RepositoryException re) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
private long count(Client client, String index) {
|
||||
return client.prepareSearch(index).setSize(0).get().getHits().totalHits();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue