inital work on hadoop plugin - gateway hdfs support

This commit is contained in:
kimchy 2010-05-23 08:17:52 +03:00
parent 596f9db8d8
commit 5c6864e8b3
19 changed files with 1234 additions and 13 deletions
.idea
modules
elasticsearch/src/main/java/org/elasticsearch
test/integration/src/test/java/org/elasticsearch/test/integration/gateway
plugins
cloud/src/main/java/org/elasticsearch/index/gateway/cloud
hadoop
settings.gradle

@ -19,6 +19,7 @@
<w>cloudservers</w>
<w>commitable</w>
<w>committable</w>
<w>conf</w>
<w>configurator</w>
<w>coord</w>
<w>cpus</w>
@ -34,6 +35,8 @@
<w>formatters</w>
<w>frac</w>
<w>freqs</w>
<w>hadoop</w>
<w>hdfs</w>
<w>hpux</w>
<w>hyperic</w>
<w>ifconfig</w>

1
.idea/modules.xml generated

@ -11,6 +11,7 @@
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-mapper-attachments.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" filepath="$PROJECT_DIR$/.idea/modules//plugin-transport-memcached.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugins-cloud.iml" filepath="$PROJECT_DIR$/.idea/modules//plugins-cloud.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//plugins-hadoop.iml" filepath="$PROJECT_DIR$/.idea/modules//plugins-hadoop.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//test-integration.iml" filepath="$PROJECT_DIR$/.idea/modules//test-integration.iml" />
<module fileurl="file://$PROJECT_DIR$/.idea/modules//test-testng.iml" filepath="$PROJECT_DIR$/.idea/modules//test-testng.iml" />
</modules>

35
.idea/modules/plugins-hadoop.iml generated Normal file

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/../../plugins/hadoop/build/classes/main" />
<output-test url="file://$MODULE_DIR$/../../plugins/hadoop/build/classes/test" />
<exclude-output />
<content url="file://$MODULE_DIR$/../../plugins/hadoop">
<sourceFolder url="file://$MODULE_DIR$/../../plugins/hadoop/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/../../plugins/hadoop/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/../../plugins/hadoop/build" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="elasticsearch" />
<orderEntry type="module-library">
<library name="hadoop">
<CLASSES>
<root url="jar://$GRADLE_REPOSITORY$/org.apache.hadoop/hadoop-core/jars/hadoop-core-0.20.2.jar!/" />
<root url="jar://$GRADLE_REPOSITORY$/commons-logging/commons-logging/jars/commons-logging-1.1.1.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="file://$MODULE_DIR$/../../../../../opt/hadoop/0.20.2/src/mapred" />
<root url="file://$MODULE_DIR$/../../../../../opt/hadoop/0.20.2/src/hdfs" />
<root url="file://$MODULE_DIR$/../../../../../opt/hadoop/0.20.2/src/tools" />
<root url="file://$MODULE_DIR$/../../../../../opt/hadoop/0.20.2/src/core" />
</SOURCES>
</library>
</orderEntry>
<orderEntry type="module" module-name="test-testng" scope="TEST" />
<orderEntry type="library" scope="TEST" name="testng" level="project" />
<orderEntry type="library" scope="TEST" name="hamcrest" level="project" />
</component>
</module>

@ -19,9 +19,9 @@
package org.elasticsearch.gateway;
import org.elasticsearch.util.inject.Module;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.util.component.LifecycleComponent;
import org.elasticsearch.util.inject.Module;
/**
* @author kimchy (shay.banon)
@ -34,5 +34,5 @@ public interface Gateway extends LifecycleComponent<Gateway> {
Class<? extends Module> suggestIndexGateway();
void reset();
void reset() throws Exception;
}

@ -22,6 +22,7 @@ package org.elasticsearch.index.gateway.fs;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
@ -76,7 +77,7 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
private final File locationTranslog;
@Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, FsIndexGateway fsIndexGateway,
@Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway fsIndexGateway,
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
super(shardId, indexSettings);
this.threadPool = threadPool;
@ -84,7 +85,7 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
this.store = store;
this.recoveryThrottler = recoveryThrottler;
this.location = new File(fsIndexGateway.indexGatewayHome(), Integer.toString(shardId.id()));
this.location = new File(((FsIndexGateway) fsIndexGateway).indexGatewayHome(), Integer.toString(shardId.id()));
this.locationIndex = new File(location, "index");
this.locationTranslog = new File(location, "translog");

@ -93,6 +93,10 @@ public class Directories {
syncFile(copyTo);
}
public static void copyFromDirectory(Directory dir, String fileName, OutputStream os) throws IOException {
copyFromDirectory(dir.openInput(fileName), os);
}
public static void copyFromDirectory(IndexInput ii, OutputStream os) throws IOException {
final int BUFFER_SIZE = ii.length() < 16384 ? (int) ii.length() : 16384;
byte[] buffer = new byte[BUFFER_SIZE];

@ -41,14 +41,14 @@ import static org.hamcrest.Matchers.*;
*/
public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests {
@AfterMethod public void closeNodes() {
@AfterMethod public void closeNodes() throws Exception {
node("server1").stop();
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset();
closeAllNodes();
}
@BeforeMethod public void buildNode1() {
@BeforeMethod public void buildNode1() throws Exception {
buildNode("server1");
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset();

@ -34,14 +34,14 @@ import static org.elasticsearch.client.Requests.*;
*/
public class FsMetaDataGatewayTests extends AbstractNodesTests {
@AfterMethod void closeNodes() {
@AfterMethod void closeNodes() throws Exception {
node("server1").stop();
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset();
closeAllNodes();
}
@BeforeMethod void buildNode1() {
@BeforeMethod void buildNode1() throws Exception {
buildNode("server1");
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("server1")).injector().getInstance(Gateway.class).reset();

@ -26,6 +26,7 @@ import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.cloud.blobstore.CloudBlobStoreService;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
@ -100,7 +101,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
private volatile int currentTranslogPartToWrite = 1;
@Inject public CloudIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard, ThreadPool threadPool,
Store store, RecoveryThrottler recoveryThrottler, CloudIndexGateway cloudIndexGateway, CloudBlobStoreService blobStoreService) {
Store store, RecoveryThrottler recoveryThrottler, IndexGateway cloudIndexGateway, CloudBlobStoreService blobStoreService) {
super(shardId, indexSettings);
this.indexShard = (InternalIndexShard) indexShard;
this.threadPool = threadPool;
@ -108,11 +109,11 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
this.store = store;
this.blobStoreContext = blobStoreService.context();
this.chunkSize = cloudIndexGateway.chunkSize();
this.shardLocation = cloudIndexGateway.indexLocation();
this.container = cloudIndexGateway.indexContainer();
this.chunkSize = ((CloudIndexGateway) cloudIndexGateway).chunkSize();
this.shardLocation = ((CloudIndexGateway) cloudIndexGateway).indexLocation();
this.container = ((CloudIndexGateway) cloudIndexGateway).indexContainer();
this.shardDirectory = cloudIndexGateway.indexDirectory() + "/" + shardId.id();
this.shardDirectory = ((CloudIndexGateway) cloudIndexGateway).indexDirectory() + "/" + shardId.id();
this.shardIndexDirectory = shardDirectory + "/index";
this.shardTranslogDirectory = shardDirectory + "/translog";

144
plugins/hadoop/build.gradle Normal file

@ -0,0 +1,144 @@
dependsOn(':elasticsearch')
apply plugin: 'java'
apply plugin: 'maven'
archivesBaseName = "elasticsearch-hadoop"
explodedDistDir = new File(distsDir, 'exploded')
configurations.compile.transitive = true
configurations.testCompile.transitive = true
// no need to use the resource dir
sourceSets.main.resources.srcDirs 'src/main/java'
sourceSets.test.resources.srcDirs 'src/test/java'
// add the source files to the dist jar
//jar {
// from sourceSets.main.allJava
//}
configurations {
dists
distLib {
visible = false
}
}
dependencies {
compile project(':elasticsearch')
compile("org.apache.hadoop:hadoop-core:0.20.2") { transitive = false }
runtime("commons-logging:commons-logging:1.1.1") { transitive = false }
distLib("org.apache.hadoop:hadoop-core:0.20.2") { transitive = false }
distLib("commons-logging:commons-logging:1.1.1") { transitive = false }
testCompile project(':test-testng')
testCompile('org.testng:testng:5.10:jdk15') { transitive = false }
}
test {
useTestNG()
jmvArgs = ["-ea", "-Xmx1024m"]
suiteName = project.name
listeners = ["org.elasticsearch.util.testng.Listeners"]
systemProperties["es.test.log.conf"] = System.getProperty("es.test.log.conf", "log4j-gradle.properties")
}
task explodedDist(dependsOn: [jar], description: 'Builds the plugin zip file') << {
[explodedDistDir]*.mkdirs()
copy {
from configurations.distLib
into explodedDistDir
}
// remove elasticsearch files (compile above adds the elasticsearch one)
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*.jar") }
copy {
from libsDir
into explodedDistDir
}
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-javadoc.jar") }
ant.delete { fileset(dir: explodedDistDir, includes: "elasticsearch-*-sources.jar") }
}
task zip(type: Zip, dependsOn: ['explodedDist']) {
from(explodedDistDir) {
}
}
task release(dependsOn: [zip]) << {
ant.delete(dir: explodedDistDir)
copy {
from distsDir
into(new File(rootProject.distsDir, "plugins"))
}
}
configurations {
deployerJars
}
dependencies {
deployerJars "org.apache.maven.wagon:wagon-http:1.0-beta-2"
}
task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}
task javadocJar(type: Jar, dependsOn: javadoc) {
classifier = 'javadoc'
from javadoc.destinationDir
}
jar {
// from sourceSets.main.allJava
manifest {
attributes("Implementation-Title": "ElasticSearch", "Implementation-Version": rootProject.version, "Implementation-Date": buildTimeStr)
}
}
artifacts {
archives sourcesJar
archives javadocJar
}
uploadArchives {
repositories.mavenDeployer {
configuration = configurations.deployerJars
repository(url: rootProject.mavenRepoUrl) {
authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
}
snapshotRepository(url: rootProject.mavenSnapshotRepoUrl) {
authentication(userName: rootProject.mavenRepoUser, password: rootProject.mavenRepoPass)
}
pom.project {
inceptionYear '2009'
name 'elasticsearch-plugins-hadoop'
description 'Hadoop Plugin for ElasticSearch'
licenses {
license {
name 'The Apache Software License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
distribution 'repo'
}
}
scm {
connection 'git://github.com/elasticsearch/elasticsearch.git'
developerConnection 'git@github.com:elasticsearch/elasticsearch.git'
url 'http://github.com/elasticsearch/elasticsearch'
}
}
pom.whenConfigured {pom ->
pom.dependencies = pom.dependencies.findAll {dep -> dep.scope != 'test' } // removes the test scoped ones
}
}
}

@ -0,0 +1 @@
plugin=org.elasticsearch.plugin.hadoop.HadoopPlugin

@ -0,0 +1,203 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.util.inject.Module;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.xcontent.ToXContent;
import org.elasticsearch.util.xcontent.XContentFactory;
import org.elasticsearch.util.xcontent.XContentParser;
import org.elasticsearch.util.xcontent.XContentType;
import org.elasticsearch.util.xcontent.builder.BinaryXContentBuilder;
import java.io.IOException;
import java.util.Map;
/**
* @author kimchy (shay.banon)
*/
public class HdfsGateway extends AbstractLifecycleComponent<Gateway> implements Gateway {
private final FileSystem fileSystem;
private final Path path;
private final Path metaDataPath;
private volatile int currentIndex;
@Inject public HdfsGateway(Settings settings, ClusterName clusterName) throws IOException {
super(settings);
String path = componentSettings.get("path");
if (path == null) {
throw new ElasticSearchIllegalArgumentException("hdfs gateway requires the 'path' path setting to be set");
}
this.path = new Path(new Path(path), clusterName.value());
this.metaDataPath = new Path(this.path, "metadata");
Configuration conf = new Configuration();
Settings hdfsSettings = settings.getByPrefix("hdfs.conf.");
for (Map.Entry<String, String> entry : hdfsSettings.getAsMap().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
fileSystem = FileSystem.get(conf);
fileSystem.mkdirs(metaDataPath);
this.currentIndex = findLatestIndex();
logger.debug("Latest metadata found at index [" + currentIndex + "]");
}
public FileSystem fileSystem() {
return this.fileSystem;
}
public Path path() {
return this.path;
}
@Override protected void doStart() throws ElasticSearchException {
}
@Override protected void doStop() throws ElasticSearchException {
}
@Override protected void doClose() throws ElasticSearchException {
try {
fileSystem.close();
} catch (IOException e) {
logger.warn("Failed to close file system {}", fileSystem);
}
}
@Override public void write(MetaData metaData) throws GatewayException {
try {
final Path file = new Path(metaDataPath, "metadata-" + (currentIndex + 1));
BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(XContentType.JSON);
builder.prettyPrint();
builder.startObject();
MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
FSDataOutputStream fileStream = fileSystem.create(file, true);
fileStream.write(builder.unsafeBytes(), 0, builder.unsafeBytesLength());
fileStream.flush();
fileStream.sync();
fileStream.close();
currentIndex++;
FileStatus[] oldFiles = fileSystem.listStatus(metaDataPath, new PathFilter() {
@Override public boolean accept(Path path) {
return path.getName().startsWith("metadata-") && !path.getName().equals(file.getName());
}
});
for (FileStatus oldFile : oldFiles) {
fileSystem.delete(oldFile.getPath(), false);
}
} catch (IOException e) {
throw new GatewayException("can't write new metadata file into the gateway", e);
}
}
@Override public MetaData read() throws GatewayException {
try {
if (currentIndex == -1)
return null;
Path file = new Path(metaDataPath, "metadata-" + currentIndex);
return readMetaData(file);
} catch (GatewayException e) {
throw e;
} catch (Exception e) {
throw new GatewayException("can't read metadata file from the gateway", e);
}
}
@Override public Class<? extends Module> suggestIndexGateway() {
return HdfsIndexGatewayModule.class;
}
@Override public void reset() throws IOException {
fileSystem.delete(path, true);
}
private int findLatestIndex() throws IOException {
FileStatus[] files = fileSystem.listStatus(metaDataPath, new PathFilter() {
@Override public boolean accept(Path path) {
return path.getName().startsWith("metadata-");
}
});
int index = -1;
for (FileStatus file : files) {
if (logger.isTraceEnabled()) {
logger.trace("[findLatestMetadata]: Processing file [" + file + "]");
}
String name = file.getPath().getName();
int fileIndex = Integer.parseInt(name.substring(name.indexOf('-') + 1));
if (fileIndex >= index) {
// try and read the meta data
try {
readMetaData(file.getPath());
index = fileIndex;
} catch (IOException e) {
logger.warn("[findLatestMetadata]: Failed to read metadata from [" + file + "], ignoring...", e);
}
}
}
return index;
}
private MetaData readMetaData(Path file) throws IOException {
FSDataInputStream fileStream = fileSystem.open(file);
XContentParser parser = null;
try {
parser = XContentFactory.xContent(XContentType.JSON).createParser(fileStream);
return MetaData.Builder.fromXContent(parser, settings);
} finally {
if (parser != null) {
parser.close();
}
try {
fileStream.close();
} catch (Exception e) {
// ignore
}
}
}
}

@ -0,0 +1,33 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.hdfs;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.util.inject.AbstractModule;
/**
* @author kimchy (shay.banon)
*/
public class HdfsGatewayModule extends AbstractModule {
@Override protected void configure() {
bind(Gateway.class).to(HdfsGateway.class).asEagerSingleton();
}
}

@ -0,0 +1,34 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.hdfs;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.hdfs.HdfsIndexGateway;
import org.elasticsearch.util.inject.AbstractModule;
/**
* @author kimchy (Shay Banon)
*/
public class HdfsIndexGatewayModule extends AbstractModule {
@Override protected void configure() {
bind(IndexGateway.class).to(HdfsIndexGateway.class).asEagerSingleton();
}
}

@ -0,0 +1,88 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway.hdfs;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.hdfs.HdfsGateway;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class HdfsIndexGateway extends AbstractIndexComponent implements IndexGateway {
private final FileSystem fileSystem;
private final Path indexPath;
@Inject public HdfsIndexGateway(Index index, @IndexSettings Settings indexSettings, Gateway gateway) {
super(index, indexSettings);
Path path = null;
String pathSetting = componentSettings.get("path");
if (pathSetting != null) {
path = new Path(pathSetting);
}
if (gateway instanceof HdfsGateway) {
HdfsGateway hdfsGateway = (HdfsGateway) gateway;
fileSystem = hdfsGateway.fileSystem();
if (path == null) {
path = hdfsGateway.path();
}
} else {
throw new ElasticSearchIllegalArgumentException("Must configure an hdfs gateway to use index hdfs gateway");
}
this.indexPath = new Path(new Path(path, "indices"), index.name());
}
public FileSystem fileSystem() {
return this.fileSystem;
}
public Path indexPath() {
return this.indexPath;
}
@Override public Class<? extends IndexShardGateway> shardGatewayClass() {
return HdfsIndexShardGateway.class;
}
@Override public void close(boolean delete) throws ElasticSearchException {
if (delete) {
try {
fileSystem.delete(indexPath, true);
} catch (IOException e) {
logger.warn("Failed to delete [{}]", e, indexPath);
}
}
}
}

@ -0,0 +1,430 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway.hdfs;
import org.apache.hadoop.fs.*;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.collect.Lists;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.util.io.stream.DataInputStreamInput;
import org.elasticsearch.util.io.stream.DataOutputStreamOutput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.lucene.Directories;
import org.elasticsearch.util.settings.Settings;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.index.translog.TranslogStreams.*;
import static org.elasticsearch.util.lucene.Directories.*;
/**
* @author kimchy (shay.banon)
*/
public class HdfsIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {
private final InternalIndexShard indexShard;
private final ThreadPool threadPool;
private final RecoveryThrottler recoveryThrottler;
private final Store store;
private final FileSystem fileSystem;
private final Path path;
private final Path indexPath;
private final Path translogPath;
private volatile FSDataOutputStream currentTranslogStream = null;
@Inject public HdfsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway hdfsIndexGateway,
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
super(shardId, indexSettings);
this.indexShard = (InternalIndexShard) indexShard;
this.threadPool = threadPool;
this.recoveryThrottler = recoveryThrottler;
this.store = store;
this.fileSystem = ((HdfsIndexGateway) hdfsIndexGateway).fileSystem();
this.path = new Path(((HdfsIndexGateway) hdfsIndexGateway).indexPath(), Integer.toString(shardId.id()));
this.indexPath = new Path(path, "index");
this.translogPath = new Path(path, "translog");
}
@Override public void close(boolean delete) throws ElasticSearchException {
if (currentTranslogStream != null) {
try {
currentTranslogStream.close();
} catch (IOException e) {
// ignore
}
}
if (delete) {
try {
fileSystem.delete(path, true);
} catch (IOException e) {
logger.warn("Failed to delete [{}]", e, path);
}
}
}
@Override public boolean requiresSnapshotScheduling() {
return true;
}
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
RecoveryStatus.Index recoveryStatusIndex = recoverIndex();
RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog();
return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog);
}
@Override public SnapshotStatus snapshot(Snapshot snapshot) {
long totalTimeStart = System.currentTimeMillis();
boolean indexDirty = false;
boolean translogDirty = false;
final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit();
final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot();
int indexNumberOfFiles = 0;
long indexTotalFilesSize = 0;
long indexTime = 0;
if (snapshot.indexChanged()) {
long time = System.currentTimeMillis();
indexDirty = true;
// snapshot into the index
final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String fileName : snapshotIndexCommit.getFiles()) {
// don't copy over the segments file, it will be copied over later on as part of the
// final snapshot phase
if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
latch.countDown();
continue;
}
IndexInput indexInput = null;
try {
indexInput = snapshotIndexCommit.getDirectory().openInput(fileName);
FileStatus fileStatus = fileSystem.getFileStatus(new Path(indexPath, fileName));
if (fileStatus.getLen() == indexInput.length()) {
// we assume its the same one, no need to copy
latch.countDown();
continue;
}
} catch (FileNotFoundException e) {
// that's fine!
} catch (Exception e) {
logger.debug("Failed to verify file equality based on length, copying...", e);
} finally {
if (indexInput != null) {
try {
indexInput.close();
} catch (IOException e) {
// ignore
}
}
}
indexNumberOfFiles++;
try {
indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(fileName);
} catch (IOException e) {
// ignore...
}
threadPool.execute(new Runnable() {
@Override public void run() {
Path copyTo = new Path(indexPath, fileName);
FSDataOutputStream fileStream;
try {
fileStream = fileSystem.create(copyTo, true);
copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, fileStream);
} catch (Exception e) {
lastException.set(new IndexShardGatewaySnapshotFailedException(shardId, "Failed to copy to [" + copyTo + "], from dir [" + snapshotIndexCommit.getDirectory() + "] and file [" + fileName + "]", e));
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
lastException.set(e);
}
if (lastException.get() != null) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", lastException.get());
}
indexTime = System.currentTimeMillis() - time;
}
int translogNumberOfOperations = 0;
long translogTime = 0;
if (snapshot.newTranslogCreated() || currentTranslogStream == null) {
translogDirty = true;
long time = System.currentTimeMillis();
// a new translog, close the current stream
if (currentTranslogStream != null) {
try {
currentTranslogStream.close();
} catch (IOException e) {
// ignore
}
}
Path currentTranslogPath = new Path(translogPath, "translog-" + translogSnapshot.translogId());
try {
currentTranslogStream = fileSystem.create(currentTranslogPath, true);
StreamOutput out = new DataOutputStreamOutput(currentTranslogStream);
for (Translog.Operation operation : translogSnapshot) {
translogNumberOfOperations++;
writeTranslogOperation(out, operation);
}
currentTranslogStream.flush();
currentTranslogStream.sync();
} catch (Exception e) {
currentTranslogPath = null;
if (currentTranslogStream != null) {
try {
currentTranslogStream.close();
} catch (IOException e1) {
// ignore
} finally {
currentTranslogStream = null;
}
}
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog into [" + currentTranslogPath + "]", e);
}
translogTime = System.currentTimeMillis() - time;
} else if (snapshot.sameTranslogNewOperations()) {
translogDirty = true;
long time = System.currentTimeMillis();
try {
StreamOutput out = new DataOutputStreamOutput(currentTranslogStream);
for (Translog.Operation operation : translogSnapshot.skipTo(snapshot.lastTranslogSize())) {
translogNumberOfOperations++;
writeTranslogOperation(out, operation);
}
} catch (Exception e) {
try {
currentTranslogStream.close();
} catch (IOException e1) {
// ignore
} finally {
currentTranslogStream = null;
}
}
translogTime = System.currentTimeMillis() - time;
}
// now write the segments file and update the translog header
if (indexDirty) {
Path segmentsPath = new Path(indexPath, snapshotIndexCommit.getSegmentsFileName());
try {
indexNumberOfFiles++;
indexTotalFilesSize += snapshotIndexCommit.getDirectory().fileLength(snapshotIndexCommit.getSegmentsFileName());
long time = System.currentTimeMillis();
FSDataOutputStream fileStream;
fileStream = fileSystem.create(segmentsPath, true);
copyFromDirectory(snapshotIndexCommit.getDirectory(), snapshotIndexCommit.getSegmentsFileName(), fileStream);
indexTime += (System.currentTimeMillis() - time);
} catch (Exception e) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + segmentsPath + "]", e);
}
}
// delete the old translog
if (snapshot.newTranslogCreated()) {
try {
fileSystem.delete(new Path(translogPath, "translog-" + snapshot.lastTranslogId()), false);
} catch (IOException e) {
// ignore
}
}
// delete files that no longer exists in the index
if (indexDirty) {
try {
FileStatus[] existingFiles = fileSystem.listStatus(indexPath);
for (FileStatus existingFile : existingFiles) {
boolean found = false;
for (final String fileName : snapshotIndexCommit.getFiles()) {
if (existingFile.getPath().getName().equals(fileName)) {
found = true;
break;
}
}
if (!found) {
fileSystem.delete(existingFile.getPath(), false);
}
}
} catch (Exception e) {
// no worries, failed to clean old ones, will clean them later
}
}
return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart),
new SnapshotStatus.Index(indexNumberOfFiles, new SizeValue(indexTotalFilesSize), new TimeValue(indexTime)),
new SnapshotStatus.Translog(translogNumberOfOperations, new TimeValue(translogTime)));
}
private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException {
FileStatus[] files;
try {
files = fileSystem.listStatus(indexPath);
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to list files", e);
}
final CountDownLatch latch = new CountDownLatch(files.length);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
final AtomicLong throttlingWaitTime = new AtomicLong();
for (final FileStatus file : files) {
threadPool.execute(new Runnable() {
@Override public void run() {
try {
long throttlingStartTime = System.currentTimeMillis();
while (!recoveryThrottler.tryStream(shardId, file.getPath().getName())) {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
}
throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime);
FSDataInputStream fileStream = fileSystem.open(file.getPath());
Directories.copyToDirectory(fileStream, store.directory(), file.getPath().getName());
fileSystem.close();
} catch (Exception e) {
logger.debug("Failed to read [" + file + "] into [" + store + "]", e);
lastException.set(e);
} finally {
recoveryThrottler.streamDone(shardId, file.getPath().getName());
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
lastException.set(e);
}
if (lastException.get() != null) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to recover index files", lastException.get());
}
long totalSize = 0;
for (FileStatus file : files) {
totalSize += file.getLen();
}
long version = -1;
try {
if (IndexReader.indexExists(store.directory())) {
version = IndexReader.getCurrentVersion(store.directory());
}
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
}
return new RecoveryStatus.Index(version, files.length, new SizeValue(totalSize, SizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get()));
}
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {
FSDataInputStream fileStream = null;
try {
long recoveryTranslogId = findLatestTranslogId();
if (recoveryTranslogId == -1) {
// no recovery file found, start the shard and bail
indexShard.start();
return new RecoveryStatus.Translog(-1, 0, new SizeValue(0, SizeUnit.BYTES));
}
FileStatus status = fileSystem.getFileStatus(new Path(translogPath, "translog-" + recoveryTranslogId));
fileStream = fileSystem.open(status.getPath());
ArrayList<Translog.Operation> operations = Lists.newArrayList();
for (; ;) {
try {
operations.add(readTranslogOperation(new DataInputStreamInput(fileStream)));
} catch (EOFException e) {
// reached end of stream
break;
}
}
indexShard.performRecovery(operations);
return new RecoveryStatus.Translog(recoveryTranslogId, operations.size(), new SizeValue(status.getLen(), SizeUnit.BYTES));
} catch (Exception e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e);
} finally {
if (fileStream != null) {
try {
fileSystem.close();
} catch (IOException e) {
// ignore
}
}
}
}
private long findLatestTranslogId() throws IOException {
FileStatus[] files = fileSystem.listStatus(translogPath, new PathFilter() {
@Override public boolean accept(Path path) {
return path.getName().startsWith("translog-");
}
});
if (files == null) {
return -1;
}
long index = -1;
for (FileStatus file : files) {
String name = file.getPath().getName();
long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1));
if (fileIndex >= index) {
index = fileIndex;
}
}
return index;
}
}

@ -0,0 +1,36 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.hadoop;
import org.elasticsearch.plugins.AbstractPlugin;
/**
* @author kimchy (shay.banon)
*/
public class HadoopPlugin extends AbstractPlugin {
@Override public String name() {
return "hadoop";
}
@Override public String description() {
return "Hadoop Plugin";
}
}

@ -0,0 +1,206 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.gateway;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.util.logging.ESLogger;
import org.elasticsearch.util.logging.Loggers;
import org.elasticsearch.util.settings.Settings;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import static org.elasticsearch.client.Requests.*;
import static org.elasticsearch.node.NodeBuilder.*;
import static org.elasticsearch.util.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class HdfsGatewayTests {
protected final ESLogger logger = Loggers.getLogger(getClass());
private Node node;
@BeforeTest void setUpNodes() throws Exception {
node = buildNode();
((InternalNode) node).injector().getInstance(Gateway.class).reset();
node.start();
}
private Node buildNode() {
Settings settings = settingsBuilder()
// .put("hdfs.conf.fs.default.name", "file://work")
.put("gateway.type", "hdfs")
.put("gateway.hdfs.path", "work/hdfs/gateway")
.build();
return nodeBuilder().settings(settingsBuilder().put(settings).put("node.name", "node1")).build();
}
@AfterTest void closeNodes() throws Exception {
((InternalNode) node).injector().getInstance(Gateway.class).reset();
node.close();
}
@Test public void testHdfsGateway() {
// first, test meta data
node.client().admin().indices().create(createIndexRequest("test")).actionGet();
node.close();
node = buildNode().start();
try {
node.client().admin().indices().create(createIndexRequest("test")).actionGet();
assert false : "index should exists";
} catch (IndexAlreadyExistsException e) {
// all is well
}
// Translog tests
// create a mapping
PutMappingResponse putMappingResponse = node.client().admin().indices().putMapping(putMappingRequest("test").type("type1")
.source(mappingSource())).actionGet();
assertThat(putMappingResponse.acknowledged(), equalTo(true));
// verify that mapping is there
ClusterStateResponse clusterState = node.client().admin().cluster().state(clusterState()).actionGet();
assertThat(clusterState.state().metaData().index("test").mapping("type1"), notNullValue());
// create two and delete the first
logger.info("Indexing #1");
node.client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
logger.info("Indexing #2");
node.client().index(Requests.indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet();
logger.info("Deleting #1");
node.client().delete(deleteRequest("test").type("type1").id("1")).actionGet();
// perform snapshot to the index
logger.info("Gateway Snapshot");
node.client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Gateway Snapshot (should be a no op)");
// do it again, it should be a no op
node.client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Closing the server");
node.close();
logger.info("Starting the server, should recover from the gateway (only translog should be populated)");
node = buildNode().start();
logger.info("Running Cluster Health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = node.client().admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
// verify that mapping is there
clusterState = node.client().admin().cluster().state(clusterState()).actionGet();
assertThat(clusterState.state().metaData().index("test").mapping("type1"), notNullValue());
logger.info("Getting #1, should not exists");
GetResponse getResponse = node.client().get(getRequest("test").type("type1").id("1")).actionGet();
assertThat(getResponse.exists(), equalTo(false));
logger.info("Getting #2");
getResponse = node.client().get(getRequest("test").type("type1").id("2")).actionGet();
assertThat(getResponse.sourceAsString(), equalTo(source("2", "test")));
// Now flush and add some data (so we have index recovery as well)
logger.info("Flushing, so we have actual content in the index files (#2 should be in the index)");
node.client().admin().indices().flush(flushRequest("test")).actionGet();
logger.info("Indexing #3, so we have something in the translog as well");
node.client().index(Requests.indexRequest("test").type("type1").id("3").source(source("3", "test"))).actionGet();
logger.info("Gateway Snapshot");
node.client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Gateway Snapshot (should be a no op)");
node.client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Closing the server");
node.close();
logger.info("Starting the server, should recover from the gateway (both index and translog)");
node = buildNode().start();
logger.info("Running Cluster Health (wait for the shards to startup)");
clusterHealth = node.client().admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
logger.info("Getting #1, should not exists");
getResponse = node.client().get(getRequest("test").type("type1").id("1")).actionGet();
assertThat(getResponse.exists(), equalTo(false));
logger.info("Getting #2 (not from the translog, but from the index)");
getResponse = node.client().get(getRequest("test").type("type1").id("2")).actionGet();
assertThat(getResponse.sourceAsString(), equalTo(source("2", "test")));
logger.info("Getting #3 (from the translog)");
getResponse = node.client().get(getRequest("test").type("type1").id("3")).actionGet();
assertThat(getResponse.sourceAsString(), equalTo(source("3", "test")));
logger.info("Flushing, so we have actual content in the index files (#3 should be in the index now as well)");
node.client().admin().indices().flush(flushRequest("test")).actionGet();
logger.info("Gateway Snapshot");
node.client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Gateway Snapshot (should be a no op)");
node.client().admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Closing the server");
node.close();
logger.info("Starting the server, should recover from the gateway (just from the index, nothing in the translog)");
node = buildNode().start();
logger.info("Running Cluster Health (wait for the shards to startup)");
clusterHealth = node.client().admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
logger.info("Getting #1, should not exists");
getResponse = node.client().get(getRequest("test").type("type1").id("1")).actionGet();
assertThat(getResponse.exists(), equalTo(false));
logger.info("Getting #2 (not from the translog, but from the index)");
getResponse = node.client().get(getRequest("test").type("type1").id("2")).actionGet();
assertThat(getResponse.sourceAsString(), equalTo(source("2", "test")));
logger.info("Getting #3 (not from the translog, but from the index)");
getResponse = node.client().get(getRequest("test").type("type1").id("3")).actionGet();
assertThat(getResponse.sourceAsString(), equalTo(source("3", "test")));
logger.info("Deleting the index");
node.client().admin().indices().delete(deleteIndexRequest("test")).actionGet();
}
private String mappingSource() {
return "{ type1 : { properties : { name : { type : \"string\" } } } }";
}
private String source(String id, String nameValue) {
return "{ type1 : { \"id\" : \"" + id + "\", \"name\" : \"" + nameValue + "\" } }";
}
}

@ -7,6 +7,7 @@ include 'test-integration'
include 'benchmark-micro'
include 'plugins-cloud'
include 'plugins-hadoop'
include 'plugins-analysis-icu'
include 'plugins-mapper-attachments'
include 'plugins-client-groovy'