Cleanup test framework in order to release it as a jar file

This commit adds javadocs and removed unused methods from central
classes like ElasticsearchIntegrationTest. It also changes visibility
of many methods and classes that are only needed inside the test infrastructure.
This commit is contained in:
Simon Willnauer 2013-11-11 18:07:18 +01:00
parent 2afdb4c8e7
commit 16ee742682
48 changed files with 798 additions and 497 deletions

View File

@ -987,6 +987,7 @@
<configuration>
<includes>
<include>org/elasticsearch/test/**/*</include>
<include>org/apache/lucene/util/AbstractRandomizedTest.class</include>
</includes>
</configuration>
</execution>

View File

@ -180,7 +180,7 @@ public class AckTests extends ElasticsearchIntegrationTest {
public void testClusterRerouteAcknowledgement() throws InterruptedException {
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("number_of_shards", atLeast(cluster().numNodes()))
.put("number_of_shards", atLeast(cluster().size()))
.put("number_of_replicas", 0)).get();
ensureGreen();
@ -220,7 +220,7 @@ public class AckTests extends ElasticsearchIntegrationTest {
public void testClusterRerouteNoAcknowledgement() throws InterruptedException {
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("number_of_shards", atLeast(cluster().numNodes()))
.put("number_of_shards", atLeast(cluster().size()))
.put("number_of_replicas", 0)).get();
ensureGreen();
@ -234,7 +234,7 @@ public class AckTests extends ElasticsearchIntegrationTest {
public void testClusterRerouteAcknowledgementDryRun() throws InterruptedException {
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("number_of_shards", atLeast(cluster().numNodes()))
.put("number_of_shards", atLeast(cluster().size()))
.put("number_of_replicas", 0)).get();
ensureGreen();
@ -270,7 +270,7 @@ public class AckTests extends ElasticsearchIntegrationTest {
public void testClusterRerouteNoAcknowledgementDryRun() throws InterruptedException {
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("number_of_shards", atLeast(cluster().numNodes()))
.put("number_of_shards", atLeast(cluster().size()))
.put("number_of_replicas", 0)).get();
ensureGreen();
@ -313,7 +313,7 @@ public class AckTests extends ElasticsearchIntegrationTest {
public void testClusterUpdateSettingsAcknowledgement() {
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("number_of_shards", atLeast(cluster().numNodes()))
.put("number_of_shards", atLeast(cluster().size()))
.put("number_of_replicas", 0)).get();
ensureGreen();
@ -358,7 +358,7 @@ public class AckTests extends ElasticsearchIntegrationTest {
public void testClusterUpdateSettingsNoAcknowledgement() {
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("number_of_shards", atLeast(cluster().numNodes()))
.put("number_of_shards", atLeast(cluster().size()))
.put("number_of_replicas", 0)).get();
ensureGreen();

View File

@ -126,7 +126,7 @@ public class ClusterRerouteTests extends ElasticsearchIntegrationTest {
logger.info("--> starting 2 nodes");
String node_1 = cluster().startNode(commonSettings);
cluster().startNode(commonSettings);
assertThat(cluster().numNodes(), equalTo(2));
assertThat(cluster().size(), equalTo(2));
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
@ -170,7 +170,7 @@ public class ClusterRerouteTests extends ElasticsearchIntegrationTest {
// wait a bit for the cluster to realize that the shard is not there...
// TODO can we get around this? the cluster is RED, so what do we wait for?
client().admin().cluster().prepareReroute().get();
assertThat(cluster().client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet().getStatus(), equalTo(ClusterHealthStatus.RED));
assertThat(client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet().getStatus(), equalTo(ClusterHealthStatus.RED));
logger.info("--> explicitly allocate primary");
state = client().admin().cluster().prepareReroute()
.add(new AllocateAllocationCommand(new ShardId("test", 0), node_1, true))

View File

@ -45,7 +45,7 @@ public class FilteringAllocationTests extends ElasticsearchIntegrationTest {
logger.info("--> starting 2 nodes");
final String node_0 = cluster().startNode();
final String node_1 = cluster().startNode();
assertThat(cluster().numNodes(), equalTo(2));
assertThat(cluster().size(), equalTo(2));
logger.info("--> creating an index with no replicas");
client().admin().indices().prepareCreate("test")
@ -84,7 +84,7 @@ public class FilteringAllocationTests extends ElasticsearchIntegrationTest {
logger.info("--> starting 2 nodes");
final String node_0 = cluster().startNode();
final String node_1 = cluster().startNode();
assertThat(cluster().numNodes(), equalTo(2));
assertThat(cluster().size(), equalTo(2));
logger.info("--> creating an index with no replicas");
client().admin().indices().prepareCreate("test")

View File

@ -58,7 +58,7 @@ public class ShardsAllocatorModuleTests extends ElasticsearchIntegrationTest {
}
private void assertAllocatorInstance(Settings settings, Class<? extends ShardsAllocator> clazz) {
while (cluster().numNodes() != 0) {
while (cluster().size() != 0) {
cluster().stopRandomNode();
}
cluster().startNode(settings);

View File

@ -113,10 +113,10 @@ public class BulkTests extends ElasticsearchIntegrationTest {
public void testBulkVersioning() throws Exception {
createIndex("test");
ensureGreen();
BulkResponse bulkResponse = run(client().prepareBulk()
BulkResponse bulkResponse = client().prepareBulk()
.add(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field", "1"))
.add(client().prepareIndex("test", "type", "2").setCreate(true).setSource("field", "1"))
.add(client().prepareIndex("test", "type", "1").setSource("field", "2")));
.add(client().prepareIndex("test", "type", "1").setSource("field", "2")).get();
assertTrue(((IndexResponse) bulkResponse.getItems()[0].getResponse()).isCreated());
assertThat(((IndexResponse) bulkResponse.getItems()[0].getResponse()).getVersion(), equalTo(1l));
@ -125,19 +125,19 @@ public class BulkTests extends ElasticsearchIntegrationTest {
assertFalse(((IndexResponse) bulkResponse.getItems()[2].getResponse()).isCreated());
assertThat(((IndexResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(2l));
bulkResponse = run(client().prepareBulk()
bulkResponse = client().prepareBulk()
.add(client().prepareUpdate("test", "type", "1").setVersion(4l).setDoc("field", "2"))
.add(client().prepareUpdate("test", "type", "2").setDoc("field", "2"))
.add(client().prepareUpdate("test", "type", "1").setVersion(2l).setDoc("field", "3")));
.add(client().prepareUpdate("test", "type", "1").setVersion(2l).setDoc("field", "3")).get();
assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("Version"));
assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(2l));
assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(3l));
bulkResponse = run(client().prepareBulk()
bulkResponse = client().prepareBulk()
.add(client().prepareIndex("test", "type", "e1").setCreate(true).setSource("field", "1").setVersion(10).setVersionType(VersionType.EXTERNAL))
.add(client().prepareIndex("test", "type", "e2").setCreate(true).setSource("field", "1").setVersion(10).setVersionType(VersionType.EXTERNAL))
.add(client().prepareIndex("test", "type", "e1").setSource("field", "2").setVersion(12).setVersionType(VersionType.EXTERNAL)));
.add(client().prepareIndex("test", "type", "e1").setSource("field", "2").setVersion(12).setVersionType(VersionType.EXTERNAL)).get();
assertTrue(((IndexResponse) bulkResponse.getItems()[0].getResponse()).isCreated());
assertThat(((IndexResponse) bulkResponse.getItems()[0].getResponse()).getVersion(), equalTo(10l));
@ -146,10 +146,10 @@ public class BulkTests extends ElasticsearchIntegrationTest {
assertFalse(((IndexResponse) bulkResponse.getItems()[2].getResponse()).isCreated());
assertThat(((IndexResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(12l));
bulkResponse = run(client().prepareBulk()
bulkResponse = client().prepareBulk()
.add(client().prepareUpdate("test", "type", "e1").setVersion(4l).setDoc("field", "2").setVersion(10).setVersionType(VersionType.EXTERNAL))
.add(client().prepareUpdate("test", "type", "e2").setDoc("field", "2").setVersion(15).setVersionType(VersionType.EXTERNAL))
.add(client().prepareUpdate("test", "type", "e1").setVersion(2l).setDoc("field", "3").setVersion(15).setVersionType(VersionType.EXTERNAL)));
.add(client().prepareUpdate("test", "type", "e1").setVersion(2l).setDoc("field", "3").setVersion(15).setVersionType(VersionType.EXTERNAL)).get();
assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("Version"));
assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(15l));

View File

@ -54,7 +54,7 @@ import static org.hamcrest.Matchers.nullValue;
public class DocumentActionsTests extends ElasticsearchIntegrationTest {
protected void createIndex() {
wipeIndex(getConcreteIndexName());
wipeIndices(getConcreteIndexName());
createIndex(getConcreteIndexName());
}

View File

@ -56,7 +56,7 @@ public class RecoverAfterNodesTests extends ElasticsearchIntegrationTest {
public Client startNode(Settings.Builder settings) {
String name = cluster().startNode(settings);
return cluster().clientNodeClient(name);
return cluster().client(name);
}
@Test

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
import org.apache.lucene.analysis.Analyzer;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.indices.analysis.PreBuiltAnalyzers;
@ -72,7 +73,7 @@ public class PreBuiltAnalyzerIntegrationTests extends ElasticsearchIntegrationTe
.endObject()
.endObject();
Settings versionSettings = randomSettingsBuilder().put(IndexMetaData.SETTING_VERSION_CREATED, randomVersion).build();
Settings versionSettings = ImmutableSettings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, randomVersion).build();
client().admin().indices().prepareCreate(indexName).addMapping("type", mapping).setSettings(versionSettings).get();
}

View File

@ -52,7 +52,7 @@ public class ConcurrentDynamicTemplateTests extends ElasticsearchIntegrationTest
int iters = atLeast(5);
for (int i = 0; i < iters; i++) {
wipeIndex("test");
wipeIndices("test");
client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()

View File

@ -285,7 +285,7 @@ public class UpdateMappingTests extends ElasticsearchIntegrationTest {
@Test
public void updateMappingConcurrently() throws Throwable {
// Test that we can concurrently update different indexes and types.
int shardNo = Math.max(5, cluster().numNodes());
int shardNo = Math.max(5, cluster().size());
prepareCreate("test1").setSettings("index.number_of_shards", shardNo).execute().actionGet();
prepareCreate("test2").setSettings("index.number_of_shards", shardNo).execute().actionGet();

View File

@ -102,7 +102,7 @@ public class SimpleDistributorTests extends ElasticsearchIntegrationTest {
}
private void createIndexWithStoreType(String index, String storeType, String distributor) {
wipeIndex(index);
wipeIndices(index);
client().admin().indices().prepareCreate(index)
.setSettings(settingsBuilder()
.put("index.store.distributor", distributor)
@ -115,7 +115,7 @@ public class SimpleDistributorTests extends ElasticsearchIntegrationTest {
}
private void createIndexWithoutRateLimitingStoreType(String index, String storeType, String distributor) {
wipeIndex(index);
wipeIndices(index);
client().admin().indices().prepareCreate(index)
.setSettings(settingsBuilder()
.put("index.store.distributor", distributor)

View File

@ -96,12 +96,13 @@ public class SimpleNestedTests extends ElasticsearchIntegrationTest {
waitForRelocation(ClusterHealthStatus.GREEN);
// flush, so we fetch it from the index (as see that we filter nested docs)
flush();
GetResponse getResponse = run(client().prepareGet("test", "type1", "1"));
GetResponse getResponse = client().prepareGet("test", "type1", "1").get();
assertThat(getResponse.isExists(), equalTo(true));
assertThat(getResponse.getSourceAsBytes(), notNullValue());
// check the numDocs
IndicesStatusResponse statusResponse = run(admin().indices().prepareStatus());
IndicesStatusResponse statusResponse = admin().indices().prepareStatus().get();
assertNoFailures(statusResponse);
assertThat(statusResponse.getIndex("test").getDocs().getNumDocs(), equalTo(3l));
// check that _all is working on nested docs
@ -111,17 +112,17 @@ public class SimpleNestedTests extends ElasticsearchIntegrationTest {
assertThat(searchResponse.getHits().totalHits(), equalTo(0l));
// search for something that matches the nested doc, and see that we don't find the nested doc
searchResponse = run(client().prepareSearch("test").setQuery(matchAllQuery()));
searchResponse = client().prepareSearch("test").setQuery(matchAllQuery()).get();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
searchResponse = run(client().prepareSearch("test").setQuery(termQuery("nested1.n_field1", "n_value1_1")));
searchResponse = client().prepareSearch("test").setQuery(termQuery("nested1.n_field1", "n_value1_1")).get();
assertThat(searchResponse.getHits().totalHits(), equalTo(0l));
// now, do a nested query
searchResponse = run(client().prepareSearch("test").setQuery(nestedQuery("nested1", termQuery("nested1.n_field1", "n_value1_1"))));
searchResponse = client().prepareSearch("test").setQuery(nestedQuery("nested1", termQuery("nested1.n_field1", "n_value1_1"))).get();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
searchResponse = run(client().prepareSearch("test").setQuery(nestedQuery("nested1", termQuery("nested1.n_field1", "n_value1_1"))).setSearchType(SearchType.DFS_QUERY_THEN_FETCH));
searchResponse = client().prepareSearch("test").setQuery(nestedQuery("nested1", termQuery("nested1.n_field1", "n_value1_1"))).setSearchType(SearchType.DFS_QUERY_THEN_FETCH).get();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -143,7 +144,7 @@ public class SimpleNestedTests extends ElasticsearchIntegrationTest {
waitForRelocation(ClusterHealthStatus.GREEN);
// flush, so we fetch it from the index (as see that we filter nested docs)
flush();
statusResponse = run(client().admin().indices().prepareStatus());
statusResponse = client().admin().indices().prepareStatus().get();
assertThat(statusResponse.getIndex("test").getDocs().getNumDocs(), equalTo(6l));
searchResponse = client().prepareSearch("test").setQuery(nestedQuery("nested1",

View File

@ -284,7 +284,7 @@ public class PercolatorTests extends ElasticsearchIntegrationTest {
.setRefresh(true)
.execute().actionGet();
wipeIndex("test");
wipeIndices("test");
prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
ensureGreen();
@ -1508,7 +1508,7 @@ public class PercolatorTests extends ElasticsearchIntegrationTest {
}
private CountDownLatch createCountDownLatch(String index) {
final CountDownLatch latch = new CountDownLatch(cluster().numNodes());
final CountDownLatch latch = new CountDownLatch(cluster().size());
Iterable<IndicesService> mapperServices = cluster().getInstances(IndicesService.class);
for (IndicesService indicesService : mapperServices) {
MapperService mapperService = indicesService.indexService(index).mapperService();

View File

@ -60,7 +60,7 @@ public class RecoveryPercolatorTests extends ElasticsearchIntegrationTest {
@Slow
public void testRestartNodePercolator1() throws Exception {
Settings settings = settingsBuilder()
.put(super.getSettings())
.put(super.indexSettings())
.put("gateway.type", "local")
.build();
cluster().startNode(settings);
@ -106,7 +106,7 @@ public class RecoveryPercolatorTests extends ElasticsearchIntegrationTest {
@Slow
public void testRestartNodePercolator2() throws Exception {
Settings settings = settingsBuilder()
.put(super.getSettings())
.put(super.indexSettings())
.put("gateway.type", "local")
.build();
cluster().startNode(settings);
@ -183,7 +183,7 @@ public class RecoveryPercolatorTests extends ElasticsearchIntegrationTest {
@Slow
public void testLoadingPercolateQueriesDuringCloseAndOpen() throws Exception {
Settings settings = settingsBuilder()
.put(super.getSettings())
.put(super.indexSettings())
.put("gateway.type", "local")
.build();
logger.info("--> Starting 2 nodes");

View File

@ -310,7 +310,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
cluster().ensureAtLeastNumNodes(3);
logger.info("--> creating test index ...");
int allowNodes = 2;
assertAcked(prepareCreate("test").setSettings(randomSettingsBuilder().put("number_of_shards", numShards).put("number_of_replicas", numReplicas).build()));
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put("number_of_shards", numShards).put("number_of_replicas", numReplicas).build()));
final AtomicLong idGenerator = new AtomicLong();
final AtomicLong indexCounter = new AtomicLong();
final AtomicBoolean stop = new AtomicBoolean(false);

View File

@ -38,7 +38,7 @@ import static org.hamcrest.Matchers.equalTo;
public class SimpleRecoveryTests extends ElasticsearchIntegrationTest {
@Override
public Settings getSettings() {
public Settings indexSettings() {
return recoverySettings();
}

View File

@ -44,7 +44,7 @@ public class AliasRoutingTests extends ElasticsearchIntegrationTest {
public void testAliasCrudRouting() throws Exception {
createIndex("test");
ensureGreen();
IndicesAliasesResponse res = run(admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test", "alias0").routing("0")));
IndicesAliasesResponse res = admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test", "alias0").routing("0")).get();
assertThat(res.isAcknowledged(), equalTo(true));
logger.info("--> indexing with id [1], and routing [0] using alias");
@ -125,11 +125,11 @@ public class AliasRoutingTests extends ElasticsearchIntegrationTest {
public void testAliasSearchRouting() throws Exception {
createIndex("test");
ensureGreen();
IndicesAliasesResponse res = run(admin().indices().prepareAliases()
IndicesAliasesResponse res = admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("test", "alias"))
.addAliasAction(newAddAliasAction("test", "alias0").routing("0"))
.addAliasAction(newAddAliasAction("test", "alias1").routing("1"))
.addAliasAction(newAddAliasAction("test", "alias01").searchRouting("0,1")));
.addAliasAction(newAddAliasAction("test", "alias01").searchRouting("0,1")).get();
assertThat(res.isAcknowledged(), equalTo(true));
logger.info("--> indexing with id [1], and routing [0] using alias");
@ -222,13 +222,13 @@ public class AliasRoutingTests extends ElasticsearchIntegrationTest {
createIndex("test-a");
createIndex("test-b");
ensureGreen();
IndicesAliasesResponse res = run(admin().indices().prepareAliases()
IndicesAliasesResponse res = admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("test-a", "alias-a0").routing("0"))
.addAliasAction(newAddAliasAction("test-a", "alias-a1").routing("1"))
.addAliasAction(newAddAliasAction("test-b", "alias-b0").routing("0"))
.addAliasAction(newAddAliasAction("test-b", "alias-b1").routing("1"))
.addAliasAction(newAddAliasAction("test-a", "alias-ab").searchRouting("0"))
.addAliasAction(newAddAliasAction("test-b", "alias-ab").searchRouting("1")));
.addAliasAction(newAddAliasAction("test-b", "alias-ab").searchRouting("1")).get();
assertThat(res.isAcknowledged(), equalTo(true));
ensureGreen(); // wait for events again to make sure we got the aliases on all nodes
logger.info("--> indexing with id [1], and routing [0] using alias to test-a");
@ -283,8 +283,8 @@ public class AliasRoutingTests extends ElasticsearchIntegrationTest {
public void testAliasSearchRoutingWithConcreteAndAliasedIndices_issue2682() throws Exception {
createIndex("index", "index_2");
ensureGreen();
IndicesAliasesResponse res = run(admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("index", "index_1").routing("1")));
IndicesAliasesResponse res = admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("index", "index_1").routing("1")).get();
assertThat(res.isAcknowledged(), equalTo(true));
logger.info("--> indexing on index_1 which is an alias for index with routing [1]");
@ -310,8 +310,8 @@ public class AliasRoutingTests extends ElasticsearchIntegrationTest {
public void testAliasSearchRoutingWithConcreteAndAliasedIndices_issue3268() throws Exception {
createIndex("index", "index_2");
ensureGreen();
IndicesAliasesResponse res = run(admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("index", "index_1").routing("1")));
IndicesAliasesResponse res = admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("index", "index_1").routing("1")).get();
assertThat(res.isAcknowledged(), equalTo(true));
logger.info("--> indexing on index_1 which is an alias for index with routing [1]");
@ -330,10 +330,10 @@ public class AliasRoutingTests extends ElasticsearchIntegrationTest {
@Test
public void testRequiredRoutingMappingWithAlias() throws Exception {
run(prepareCreate("test").addMapping(
prepareCreate("test").addMapping(
"type1",
XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_routing").field("required", true)
.endObject().endObject().endObject()));
.endObject().endObject().endObject()).get();
ensureGreen();
logger.info("--> indexing with id [1], and routing [0]");
client().prepareIndex("test", "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet();
@ -377,8 +377,8 @@ public class AliasRoutingTests extends ElasticsearchIntegrationTest {
createIndex("test");
ensureGreen();
logger.info("--> creating alias with routing [3]");
IndicesAliasesResponse res = run(admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("test", "alias").routing("3")));
IndicesAliasesResponse res = admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("test", "alias").routing("3")).get();
assertThat(res.isAcknowledged(), equalTo(true));
logger.info("--> indexing with id [0], and routing [3]");
@ -393,8 +393,8 @@ public class AliasRoutingTests extends ElasticsearchIntegrationTest {
}
logger.info("--> creating alias with routing [4]");
res = run(admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("test", "alias").routing("4")));
res = admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("test", "alias").routing("4")).get();
assertThat(res.isAcknowledged(), equalTo(true));
logger.info("--> verifying search with wrong routing should not find");

View File

@ -106,7 +106,7 @@ public class SearchWhileCreatingIndexTests extends ElasticsearchIntegrationTest
status = client().admin().cluster().prepareHealth("test").get().getStatus();
cluster().ensureAtLeastNumNodes(numberOfReplicas + 1);
}
wipeIndex("test");
wipeIndices("test");
}
}
}

View File

@ -4,6 +4,7 @@ import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.text.Text;
@ -24,8 +25,8 @@ import static org.hamcrest.Matchers.equalTo;
public class ExtendedFacetsTests extends ElasticsearchIntegrationTest {
@Override
public Settings getSettings() {
return randomSettingsBuilder()
public Settings indexSettings() {
return ImmutableSettings.builder()
.put("index.number_of_shards", numberOfShards())
.put("index.number_of_replicas", 0)
.build();

View File

@ -29,6 +29,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -69,8 +70,8 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest {
private int numRuns = -1;
@Override
public Settings getSettings() {
return randomSettingsBuilder()
public Settings indexSettings() {
return ImmutableSettings.builder()
.put("index.number_of_shards", between(1, 5))
.put("index.number_of_replicas", 0)
.build();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.facet.terms;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.facet.Facets;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -51,7 +52,7 @@ public class ShardSizeTermsFacetTests extends ElasticsearchIntegrationTest {
*/
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return randomSettingsBuilder()
return ImmutableSettings.builder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 0)
.put("cluster.routing.operation.hash.type", "djb")

View File

@ -22,6 +22,7 @@ package org.elasticsearch.search.facet.terms;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ -41,8 +42,8 @@ import static org.hamcrest.Matchers.is;
public class UnmappedFieldsTermsFacetsTests extends ElasticsearchIntegrationTest {
@Override
public Settings getSettings() {
return randomSettingsBuilder()
public Settings indexSettings() {
return ImmutableSettings.builder()
.put("index.number_of_shards", numberOfShards())
.put("index.number_of_replicas", 0)
.build();
@ -157,7 +158,7 @@ public class UnmappedFieldsTermsFacetsTests extends ElasticsearchIntegrationTest
@Test
public void testPartiallyUnmappedField() throws ElasticSearchException, IOException {
client().admin().indices().prepareCreate("mapped_idx")
.setSettings(getSettings())
.setSettings(indexSettings())
.addMapping("type", jsonBuilder().startObject().startObject("type").startObject("properties")
.startObject("partially_mapped_byte").field("type", "byte").endObject()
.startObject("partially_mapped_short").field("type", "short").endObject()
@ -285,7 +286,7 @@ public class UnmappedFieldsTermsFacetsTests extends ElasticsearchIntegrationTest
@Test
public void testMappedYetMissingField() throws IOException {
client().admin().indices().prepareCreate("idx")
.setSettings(getSettings())
.setSettings(indexSettings())
.addMapping("type", jsonBuilder().startObject()
.field("type").startObject()
.field("properties").startObject()

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.facet.termsstats;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.facet.Facets;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -51,7 +52,7 @@ public class ShardSizeTermsStatsFacetTests extends ElasticsearchIntegrationTest
*/
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return randomSettingsBuilder()
return ImmutableSettings.builder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 0)
.put("cluster.routing.operation.hash.type", "djb")

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.search.sort.SortOrder;
@ -50,8 +51,8 @@ import static org.hamcrest.Matchers.*;
public class SearchFieldsTests extends ElasticsearchIntegrationTest {
@Override
public Settings getSettings() {
return randomSettingsBuilder()
public Settings indexSettings() {
return ImmutableSettings.builder()
.put("index.number_of_shards", 1) // why just one?
.put("index.number_of_replicas", 0)
.build();

View File

@ -36,7 +36,7 @@ public class SearchPreferenceTests extends ElasticsearchIntegrationTest {
@Test // see #2896
public void testStopOneNodePreferenceWithRedState() throws InterruptedException {
client().admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", cluster().numNodes()+2).put("index.number_of_replicas", 0)).execute().actionGet();
client().admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", cluster().size()+2).put("index.number_of_replicas", 0)).execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
for (int i = 0; i < 10; i++) {
client().prepareIndex("test", "type1", ""+i).setSource("field1", "value1").execute().actionGet();

View File

@ -62,7 +62,7 @@ public class SimpleQueryTests extends ElasticsearchIntegrationTest {
@Test // see https://github.com/elasticsearch/elasticsearch/issues/3177
public void testIssue3177() {
run(prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1)));
prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1)).get();
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").execute().actionGet();
client().prepareIndex("test", "type1", "2").setSource("field1", "value2").execute().actionGet();
client().prepareIndex("test", "type1", "3").setSource("field1", "value3").execute().actionGet();

View File

@ -59,8 +59,8 @@ import static org.hamcrest.Matchers.*;
public class SimpleSortTests extends ElasticsearchIntegrationTest {
@Override
public Settings getSettings() {
return randomSettingsBuilder()
public Settings indexSettings() {
return ImmutableSettings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 0)
.build();
@ -111,7 +111,7 @@ public class SimpleSortTests extends ElasticsearchIntegrationTest {
int numberOfShards = between(1, 10);
Random random = getRandom();
prepareCreate("test")
.setSettings(randomSettingsBuilder().put("index.number_of_shards", numberOfShards).put("index.number_of_replicas", 0))
.setSettings(ImmutableSettings.builder().put("index.number_of_shards", numberOfShards).put("index.number_of_replicas", 0))
.addMapping("type",
XContentFactory.jsonBuilder()
.startObject()
@ -236,7 +236,7 @@ public class SimpleSortTests extends ElasticsearchIntegrationTest {
@Test
public void testScoreSortDirection() throws Exception {
prepareCreate("test").setSettings(randomSettingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
prepareCreate("test").setSettings(ImmutableSettings.builder().put("index.number_of_shards", 1)).execute().actionGet();
ensureGreen();
client().prepareIndex("test", "type", "1").setSource("field", 2).execute().actionGet();
@ -268,7 +268,7 @@ public class SimpleSortTests extends ElasticsearchIntegrationTest {
@Test
public void testScoreSortDirection_withFunctionScore() throws Exception {
prepareCreate("test").setSettings(randomSettingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
prepareCreate("test").setSettings(ImmutableSettings.builder().put("index.number_of_shards", 1)).execute().actionGet();
ensureGreen();
client().prepareIndex("test", "type", "1").setSource("field", 2).execute().actionGet();
@ -299,7 +299,7 @@ public class SimpleSortTests extends ElasticsearchIntegrationTest {
@Test
public void testIssue2986() {
prepareCreate("test").setSettings(getSettings()).execute().actionGet();
prepareCreate("test").setSettings(indexSettings()).execute().actionGet();
client().prepareIndex("test", "post", "1").setSource("{\"field1\":\"value1\"}").execute().actionGet();
client().prepareIndex("test", "post", "2").setSource("{\"field1\":\"value2\"}").execute().actionGet();
@ -353,7 +353,7 @@ public class SimpleSortTests extends ElasticsearchIntegrationTest {
final int numberOfShards = between(1, 10);
Random random = getRandom();
prepareCreate("test")
.setSettings(randomSettingsBuilder().put("index.number_of_shards", numberOfShards).put("index.number_of_replicas", 0))
.setSettings(ImmutableSettings.builder().put("index.number_of_shards", numberOfShards).put("index.number_of_replicas", 0))
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
.startObject("str_value").field("type", "string").field("index", "not_analyzed").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.startObject("boolean_value").field("type", "boolean").endObject()
@ -671,7 +671,7 @@ public class SimpleSortTests extends ElasticsearchIntegrationTest {
.startObject("svalue").field("type", "string").endObject()
.startObject("gvalue").field("type", "geo_point").endObject()
.endObject().endObject().endObject().string();
prepareCreate("test").setSettings(getSettings()).addMapping("type1", mapping).execute().actionGet();
prepareCreate("test").setSettings(indexSettings()).addMapping("type1", mapping).execute().actionGet();
ensureGreen();
for (int i = 0; i < 10; i++) {
@ -761,7 +761,7 @@ public class SimpleSortTests extends ElasticsearchIntegrationTest {
String mapping = jsonBuilder().startObject().startObject("type1").startObject("properties")
.startObject("svalue").field("type", "string").field("index", "not_analyzed").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.endObject().endObject().endObject().string();
prepareCreate("test").setSettings(getSettings()).addMapping("type1", mapping).execute().actionGet();
prepareCreate("test").setSettings(indexSettings()).addMapping("type1", mapping).execute().actionGet();
ensureGreen();
client().prepareIndex("test", "type1").setSource(jsonBuilder().startObject()
@ -1047,7 +1047,7 @@ public class SimpleSortTests extends ElasticsearchIntegrationTest {
@Test
public void testSortMVField() throws Exception {
prepareCreate("test")
.setSettings(randomSettingsBuilder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.setSettings(ImmutableSettings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
.startObject("long_values").field("type", "long").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.startObject("int_values").field("type", "integer").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
@ -1365,7 +1365,7 @@ public class SimpleSortTests extends ElasticsearchIntegrationTest {
@Test
public void testSortOnRareField() throws ElasticSearchException, IOException {
prepareCreate("test")
.setSettings(randomSettingsBuilder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.setSettings(ImmutableSettings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
.startObject("string_values").field("type", "string").field("index", "not_analyzed").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.endObject().endObject().endObject())

View File

@ -24,6 +24,11 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
@ -31,6 +36,7 @@ import org.elasticsearch.index.search.stats.SearchStats.Stats;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -41,8 +47,8 @@ import static org.hamcrest.Matchers.*;
public class SearchStatsTests extends ElasticsearchIntegrationTest {
@Override
public Settings getSettings() {
return randomSettingsBuilder()
public Settings indexSettings() {
return ImmutableSettings.builder()
.put("index.number_of_replicas", 0)
.build();
}
@ -103,6 +109,21 @@ public class SearchStatsTests extends ElasticsearchIntegrationTest {
}
private Set<String> nodeIdsWithIndex(String... indices) {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
GroupShardsIterator allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
Set<String> nodes = new HashSet<String>();
for (ShardIterator shardIterator : allAssignedShardsGrouped) {
for (ShardRouting routing : shardIterator.asUnordered()) {
if (routing.active()) {
nodes.add(routing.currentNodeId());
}
}
}
return nodes;
}
@Test
public void testOpenContexts() {
createIndex("test1");
@ -128,4 +149,10 @@ public class SearchStatsTests extends ElasticsearchIntegrationTest {
indicesStats = client().admin().indices().prepareStats().execute().actionGet();
assertThat(indicesStats.getTotal().getSearch().getOpenContexts(), equalTo(0l));
}
protected int numAssignedShards(String... indices) {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
GroupShardsIterator allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
return allAssignedShardsGrouped.size();
}
}

View File

@ -761,7 +761,7 @@ public class CompletionSuggestSearchTests extends ElasticsearchIntegrationTest {
private ImmutableSettings.Builder createDefaultSettings() {
int randomShardNumber = between(1, 5);
int randomReplicaNumber = between(0, cluster().numNodes() - 1);
int randomReplicaNumber = between(0, cluster().size() - 1);
return settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, randomShardNumber).put(SETTING_NUMBER_OF_REPLICAS, randomReplicaNumber);
}

View File

@ -142,7 +142,7 @@ public class SuggestSearchTests extends ElasticsearchIntegrationTest {
public void testUnmappedField() throws IOException, InterruptedException, ExecutionException {
CreateIndexRequestBuilder builder = prepareCreate("test").setSettings(settingsBuilder()
.put(SETTING_NUMBER_OF_SHARDS, between(1,5))
.put(SETTING_NUMBER_OF_REPLICAS, between(0, cluster().numNodes() - 1))
.put(SETTING_NUMBER_OF_REPLICAS, between(0, cluster().size() - 1))
.put("index.analysis.analyzer.biword.tokenizer", "standard")
.putArray("index.analysis.analyzer.biword.filter", "shingler", "lowercase")
.put("index.analysis.filter.shingler.type", "shingle")
@ -690,7 +690,7 @@ public class SuggestSearchTests extends ElasticsearchIntegrationTest {
public void testShardFailures() throws IOException, InterruptedException {
CreateIndexRequestBuilder builder = prepareCreate("test").setSettings(settingsBuilder()
.put(SETTING_NUMBER_OF_SHARDS, between(1, 5))
.put(SETTING_NUMBER_OF_REPLICAS, between(0, cluster().numNodes() - 1))
.put(SETTING_NUMBER_OF_REPLICAS, between(0, cluster().size() - 1))
.put("index.analysis.analyzer.suggest.tokenizer", "standard")
.putArray("index.analysis.analyzer.suggest.filter", "standard", "lowercase", "shingler")
.put("index.analysis.filter.shingler.type", "shingle")
@ -798,7 +798,7 @@ public class SuggestSearchTests extends ElasticsearchIntegrationTest {
CreateIndexRequestBuilder builder = prepareCreate("test").setSettings(settingsBuilder()
.put(SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(SETTING_NUMBER_OF_REPLICAS, between(0, cluster().numNodes() - 1))
.put(SETTING_NUMBER_OF_REPLICAS, between(0, cluster().size() - 1))
.put("index.analysis.analyzer.body.tokenizer", "standard")
.putArray("index.analysis.analyzer.body.filter", "lowercase", "my_shingle")
.put("index.analysis.filter.my_shingle.type", "shingle")

View File

@ -20,6 +20,7 @@
package org.elasticsearch.search.timeout;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ -34,8 +35,8 @@ import static org.hamcrest.Matchers.equalTo;
public class SearchTimeoutTests extends ElasticsearchIntegrationTest {
@Override
public Settings getSettings() {
return randomSettingsBuilder()
public Settings indexSettings() {
return ImmutableSettings.builder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 0)
.build();

View File

@ -104,7 +104,7 @@ public abstract class AbstractSnapshotTests extends ElasticsearchIntegrationTest
public String waitForCompletionOrBlock(Collection<String> nodes, String repository, String snapshot, TimeValue timeout) throws InterruptedException {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout.millis()) {
ImmutableList<SnapshotInfo> snapshotInfos = run(client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshot)).getSnapshots();
ImmutableList<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshot).get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
if (snapshotInfos.get(0).state().completed()) {
return null;
@ -124,7 +124,7 @@ public abstract class AbstractSnapshotTests extends ElasticsearchIntegrationTest
public SnapshotInfo waitForCompletion(String repository, String snapshot, TimeValue timeout) throws InterruptedException {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout.millis()) {
ImmutableList<SnapshotInfo> snapshotInfos = run(client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshot)).getSnapshots();
ImmutableList<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshot).get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
if (snapshotInfos.get(0).state().completed()) {
return snapshotInfos.get(0);

View File

@ -49,7 +49,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
public void restorePersistentSettingsTest() throws Exception {
logger.info("--> start node");
cluster().startNode(settingsBuilder().put("gateway.type", "local"));
Client client = cluster().client();
Client client = client();
// Add dummy persistent setting
logger.info("--> set test persistent setting");
@ -87,7 +87,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
ArrayList<String> nodes = newArrayList();
nodes.add(cluster().startNode());
nodes.add(cluster().startNode());
Client client = cluster().client();
Client client = client();
assertAcked(prepareCreate("test-idx", 2, settingsBuilder().put("number_of_shards", 2).put("number_of_replicas", 0).put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false)));
ensureGreen();
@ -97,22 +97,22 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(run(client.prepareCount("test-idx")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
logger.info("--> create repository");
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings(
ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.TEST))
.put("random", randomAsciiOfLength(10))
.put("random_data_file_blocking_rate", 0.1)
.put("wait_after_unblock", 200)
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> snapshot");
run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx"));
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
String blockedNode = waitForCompletionOrBlock(nodes, "test-repo", "test-snap", TimeValue.timeValueSeconds(60));
if (blockedNode != null) {
@ -126,7 +126,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
logger.info("--> done");
} else {
logger.info("--> done without blocks");
ImmutableList<SnapshotInfo> snapshotInfos = run(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap")).getSnapshots();
ImmutableList<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfos.get(0).shardFailures().size(), equalTo(0));

View File

@ -44,14 +44,14 @@ public class RepositoriesTests extends AbstractSnapshotTests {
Client client = client();
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo-1")
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo-1")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> check that repository is really there");
ClusterStateResponse clusterStateResponse = run(client.admin().cluster().prepareState().setFilterAll().setFilterMetaData(false));
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setFilterAll().setFilterMetaData(false).get();
MetaData metaData = clusterStateResponse.getState().getMetaData();
RepositoriesMetaData repositoriesMetaData = metaData.custom(RepositoriesMetaData.TYPE);
assertThat(repositoriesMetaData, notNullValue());
@ -59,14 +59,14 @@ public class RepositoriesTests extends AbstractSnapshotTests {
assertThat(repositoriesMetaData.repository("test-repo-1").type(), equalTo("fs"));
logger.info("--> creating anoter repository");
putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo-2")
putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo-2")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> check that both repositories are in cluster state");
clusterStateResponse = run(client.admin().cluster().prepareState().setFilterAll().setFilterMetaData(false));
clusterStateResponse = client.admin().cluster().prepareState().setFilterAll().setFilterMetaData(false).get();
metaData = clusterStateResponse.getState().getMetaData();
repositoriesMetaData = metaData.custom(RepositoriesMetaData.TYPE);
assertThat(repositoriesMetaData, notNullValue());
@ -77,20 +77,20 @@ public class RepositoriesTests extends AbstractSnapshotTests {
assertThat(repositoriesMetaData.repository("test-repo-2").type(), equalTo("fs"));
logger.info("--> check that both repositories can be retrieved by getRepositories query");
GetRepositoriesResponse repositoriesResponse = run(client.admin().cluster().prepareGetRepositories());
GetRepositoriesResponse repositoriesResponse = client.admin().cluster().prepareGetRepositories().get();
assertThat(repositoriesResponse.repositories().size(), equalTo(2));
assertThat(findRepository(repositoriesResponse.repositories(), "test-repo-1"), notNullValue());
assertThat(findRepository(repositoriesResponse.repositories(), "test-repo-2"), notNullValue());
logger.info("--> delete repository test-repo-1");
run(client.admin().cluster().prepareDeleteRepository("test-repo-1"));
repositoriesResponse = run(client.admin().cluster().prepareGetRepositories());
client.admin().cluster().prepareDeleteRepository("test-repo-1").get();
repositoriesResponse = client.admin().cluster().prepareGetRepositories().get();
assertThat(repositoriesResponse.repositories().size(), equalTo(1));
assertThat(findRepository(repositoriesResponse.repositories(), "test-repo-2"), notNullValue());
logger.info("--> delete repository test-repo-2");
run(client.admin().cluster().prepareDeleteRepository("test-repo-2"));
repositoriesResponse = run(client.admin().cluster().prepareGetRepositories());
client.admin().cluster().prepareDeleteRepository("test-repo-2").get();
repositoriesResponse = client.admin().cluster().prepareGetRepositories().get();
assertThat(repositoriesResponse.repositories().size(), equalTo(0));
}
@ -109,7 +109,7 @@ public class RepositoriesTests extends AbstractSnapshotTests {
logger.info("--> trying creating repository with incorrect settings");
try {
run(client.admin().cluster().preparePutRepository("test-repo").setType("fs"));
client.admin().cluster().preparePutRepository("test-repo").setType("fs").get();
fail("Shouldn't be here");
} catch (RepositoryException ex) {
// Expected
@ -120,31 +120,31 @@ public class RepositoriesTests extends AbstractSnapshotTests {
public void repositoryAckTimeoutTest() throws Exception {
logger.info("--> creating repository test-repo-1 with 0s timeout - shouldn't ack");
PutRepositoryResponse putRepositoryResponse = run(client().admin().cluster().preparePutRepository("test-repo-1")
PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo-1")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(5, 100))
)
.setTimeout("0s"));
.setTimeout("0s").get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(false));
logger.info("--> creating repository test-repo-2 with standard timeout - should ack");
putRepositoryResponse = run(client().admin().cluster().preparePutRepository("test-repo-2")
putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo-2")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(5, 100))
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> deleting repository test-repo-2 with 0s timeout - shouldn't ack");
DeleteRepositoryResponse deleteRepositoryResponse = run(client().admin().cluster().prepareDeleteRepository("test-repo-2")
.setTimeout("0s"));
DeleteRepositoryResponse deleteRepositoryResponse = client().admin().cluster().prepareDeleteRepository("test-repo-2")
.setTimeout("0s").get();
assertThat(deleteRepositoryResponse.isAcknowledged(), equalTo(false));
logger.info("--> deleting repository test-repo-1 with standard timeout - should ack");
deleteRepositoryResponse = run(client().admin().cluster().prepareDeleteRepository("test-repo-1"));
deleteRepositoryResponse = client().admin().cluster().prepareDeleteRepository("test-repo-1").get();
assertThat(deleteRepositoryResponse.isAcknowledged(), equalTo(true));
}

View File

@ -54,10 +54,10 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
@Override
public Settings getSettings() {
public Settings indexSettings() {
// During restore we frequently restore index to exactly the same state it was before, that might cause the same
// checksum file to be written twice during restore operation
return ImmutableSettings.builder().put(super.getSettings())
return ImmutableSettings.builder().put(super.indexSettings())
.put(MockDirectoryHelper.RANDOM_PREVENT_DOUBLE_WRITE, false)
.put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false) //TODO: Ask Simon if this is hiding an issue
.build();
@ -68,12 +68,12 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
Client client = client();
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(5, 100))
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
@ -86,43 +86,43 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
assertThat(run(client.prepareCount("test-idx-1")).getCount(), equalTo(100L));
assertThat(run(client.prepareCount("test-idx-2")).getCount(), equalTo(100L));
assertThat(run(client.prepareCount("test-idx-3")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3"));
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(run(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap")).getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> delete some data");
for (int i = 0; i < 50; i++) {
run(client.prepareDelete("test-idx-1", "doc", Integer.toString(i)));
client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get();
}
for (int i = 50; i < 100; i++) {
run(client.prepareDelete("test-idx-2", "doc", Integer.toString(i)));
client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get();
}
for (int i = 0; i < 100; i += 2) {
run(client.prepareDelete("test-idx-3", "doc", Integer.toString(i)));
client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get();
}
refresh();
assertThat(run(client.prepareCount("test-idx-1")).getCount(), equalTo(50L));
assertThat(run(client.prepareCount("test-idx-2")).getCount(), equalTo(50L));
assertThat(run(client.prepareCount("test-idx-3")).getCount(), equalTo(50L));
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(50L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(50L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L));
logger.info("--> close indices");
run(client.admin().indices().prepareClose("test-idx-1", "test-idx-2"));
client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get();
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(run(client.prepareCount("test-idx-1")).getCount(), equalTo(100L));
assertThat(run(client.prepareCount("test-idx-2")).getCount(), equalTo(100L));
assertThat(run(client.prepareCount("test-idx-3")).getCount(), equalTo(50L));
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L));
// Test restore after index deletion
logger.info("--> delete indices");
@ -131,8 +131,8 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(run(client.prepareCount("test-idx-1")).getCount(), equalTo(100L));
ClusterState clusterState = run(client.admin().cluster().prepareState()).getState();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
}
@ -142,16 +142,16 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
Client client = client();
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", newTempDir())));
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", newTempDir())).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true));
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0));
assertThat(run(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap")).getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
}
@Test
@ -159,22 +159,22 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
Client client = client();
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", newTempDir())));
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", newTempDir())).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> creating test template");
assertThat(run(client.admin().indices().preparePutTemplate("test-template").setTemplate("te*").addMapping("test-mapping", "{}")).isAcknowledged(), equalTo(true));
assertThat(client.admin().indices().preparePutTemplate("test-template").setTemplate("te*").addMapping("test-mapping", "{}").get().isAcknowledged(), equalTo(true));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setIndices().setWaitForCompletion(true));
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setIndices().setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0));
assertThat(run(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap")).getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> delete test template");
assertThat(run(client.admin().indices().prepareDeleteTemplate("test-template")).isAcknowledged(), equalTo(true));
ClusterStateResponse clusterStateResponse = run(client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices());
assertThat(client.admin().indices().prepareDeleteTemplate("test-template").get().isAcknowledged(), equalTo(true));
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices().get();
assertThat(clusterStateResponse.getState().getMetaData().templates().containsKey("test-template"), equalTo(false));
logger.info("--> restore cluster state");
@ -183,7 +183,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(0));
logger.info("--> check that template is restored");
clusterStateResponse = run(client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices());
clusterStateResponse = client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices().get();
assertThat(clusterStateResponse.getState().getMetaData().templates().containsKey("test-template"), equalTo(true));
}
@ -193,28 +193,28 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
logger.info("--> creating repository");
File location = newTempDir();
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", location)));
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", location)).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> creating test template");
assertThat(run(client.admin().indices().preparePutTemplate("test-template").setTemplate("te*").addMapping("test-mapping", "{}")).isAcknowledged(), equalTo(true));
assertThat(client.admin().indices().preparePutTemplate("test-template").setTemplate("te*").addMapping("test-mapping", "{}").get().isAcknowledged(), equalTo(true));
logger.info("--> snapshot without global state");
CreateSnapshotResponse createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-no-global-state").setIndices().setIncludeGlobalState(false).setWaitForCompletion(true));
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-no-global-state").setIndices().setIncludeGlobalState(false).setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0));
assertThat(run(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-no-global-state")).getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-no-global-state").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> snapshot with global state");
createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-with-global-state").setIndices().setIncludeGlobalState(true).setWaitForCompletion(true));
createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-with-global-state").setIndices().setIncludeGlobalState(true).setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0));
assertThat(run(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-with-global-state")).getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-with-global-state").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> delete test template");
wipeTemplates("test-template");
ClusterStateResponse clusterStateResponse = run(client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices());
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices().get();
assertThat(clusterStateResponse.getState().getMetaData().templates().containsKey("test-template"), equalTo(false));
logger.info("--> try restoring cluster state from snapshot without global state");
@ -222,7 +222,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(0));
logger.info("--> check that template wasn't restored");
clusterStateResponse = run(client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices());
clusterStateResponse = client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices().get();
assertThat(clusterStateResponse.getState().getMetaData().templates().containsKey("test-template"), equalTo(false));
logger.info("--> restore cluster state");
@ -230,7 +230,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(0));
logger.info("--> check that template is restored");
clusterStateResponse = run(client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices());
clusterStateResponse = client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices().get();
assertThat(clusterStateResponse.getState().getMetaData().templates().containsKey("test-template"), equalTo(true));
createIndex("test-idx");
@ -241,18 +241,18 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(run(client.prepareCount("test-idx")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
logger.info("--> snapshot without global state but with indices");
createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-no-global-state-with-index").setIndices("test-idx").setIncludeGlobalState(false).setWaitForCompletion(true));
createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-no-global-state-with-index").setIndices("test-idx").setIncludeGlobalState(false).setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(run(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-no-global-state-with-index")).getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-no-global-state-with-index").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> delete test template and index ");
wipeIndex("test-idx");
wipeIndices("test-idx");
wipeTemplates("test-template");
clusterStateResponse = run(client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices());
clusterStateResponse = client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices().get();
assertThat(clusterStateResponse.getState().getMetaData().templates().containsKey("test-template"), equalTo(false));
logger.info("--> try restoring index and cluster state from snapshot without global state");
@ -262,9 +262,9 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
ensureGreen();
logger.info("--> check that template wasn't restored but index was");
clusterStateResponse = run(client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices());
clusterStateResponse = client.admin().cluster().prepareState().setFilterRoutingTable(true).setFilterNodes(true).setFilterIndexTemplates("test-template").setFilterIndices().get();
assertThat(clusterStateResponse.getState().getMetaData().templates().containsKey("test-template"), equalTo(false));
assertThat(run(client.prepareCount("test-idx")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
}
@ -274,13 +274,13 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
Client client = client();
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings(
ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.TEST))
.put("random", randomAsciiOfLength(10))
.put("random_control_io_exception_rate", 0.2)
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx");
@ -291,11 +291,11 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(run(client.prepareCount("test-idx")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
try {
CreateSnapshotResponse createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx"));
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get();
if (createSnapshotResponse.getSnapshotInfo().totalShards() == createSnapshotResponse.getSnapshotInfo().successfulShards()) {
// If we are here, that means we didn't have any failures, let's check it
assertThat(getFailureCount("test-repo"), equalTo(0L));
@ -307,7 +307,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(shardFailure.nodeId(), notNullValue());
assertThat(shardFailure.index(), equalTo("test-idx"));
}
GetSnapshotsResponse getSnapshotsResponse = run(client.admin().cluster().prepareGetSnapshots("test-repo").addSnapshots("test-snap"));
GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("test-repo").addSnapshots("test-snap").get();
assertThat(getSnapshotsResponse.getSnapshots().size(), equalTo(1));
SnapshotInfo snapshotInfo = getSnapshotsResponse.getSnapshots().get(0);
if (snapshotInfo.state() == SnapshotState.SUCCESS) {
@ -326,13 +326,13 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
public void dataFileFailureDuringSnapshotTest() throws Exception {
Client client = client();
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings(
ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.TEST))
.put("random", randomAsciiOfLength(10))
.put("random_data_file_io_exception_rate", 0.1)
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx");
@ -343,10 +343,10 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(run(client.prepareCount("test-idx")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx"));
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get();
if (createSnapshotResponse.getSnapshotInfo().totalShards() == createSnapshotResponse.getSnapshotInfo().successfulShards()) {
// If we are here, that means we didn't have any failures, let's check it
assertThat(getFailureCount("test-repo"), equalTo(0L));
@ -357,7 +357,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(shardFailure.nodeId(), notNullValue());
assertThat(shardFailure.index(), equalTo("test-idx"));
}
GetSnapshotsResponse getSnapshotsResponse = run(client.admin().cluster().prepareGetSnapshots("test-repo").addSnapshots("test-snap"));
GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("test-repo").addSnapshots("test-snap").get();
assertThat(getSnapshotsResponse.getSnapshots().size(), equalTo(1));
SnapshotInfo snapshotInfo = getSnapshotsResponse.getSnapshots().get(0);
assertThat(snapshotInfo.shardFailures().size(), greaterThan(0));
@ -372,8 +372,8 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
File repositoryLocation = newTempDir(LifecycleScope.TEST);
Client client = client();
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", repositoryLocation)));
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", repositoryLocation)).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx");
@ -384,31 +384,31 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(run(client.prepareCount("test-idx")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx"));
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get();
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(createSnapshotResponse.getSnapshotInfo().successfulShards()));
logger.info("--> update repository with mock version");
putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings(
ImmutableSettings.settingsBuilder()
.put("location", repositoryLocation)
.put("random", randomAsciiOfLength(10))
.put("random_data_file_io_exception_rate", 0.3)
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
// Test restore after index deletion
logger.info("--> delete index");
wipeIndex("test-idx");
wipeIndices("test-idx");
logger.info("--> restore index after deletion");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
CountResponse countResponse = run(client.prepareCount("test-idx"));
CountResponse countResponse = client.prepareCount("test-idx").get();
assertThat(countResponse.getCount(), equalTo(100L));
}
@ -420,8 +420,8 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
File repositoryLocation = newTempDir(LifecycleScope.TEST);
Client client = client();
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", repositoryLocation)));
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", repositoryLocation)).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx");
@ -432,26 +432,26 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(run(client.prepareCount("test-idx")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx"));
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get();
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(createSnapshotResponse.getSnapshotInfo().successfulShards()));
logger.info("--> update repository with mock version");
putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings(
ImmutableSettings.settingsBuilder()
.put("location", repositoryLocation)
.put("random", randomAsciiOfLength(10))
.put("random_data_file_io_exception_rate", 1.0) // Fail completely
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
// Test restore after index deletion
logger.info("--> delete index");
wipeIndex("test-idx");
wipeIndices("test-idx");
logger.info("--> restore index after deletion");
ListenableActionFuture<RestoreSnapshotResponse> restoreSnapshotResponseFuture =
client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute();
@ -461,7 +461,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(waitForIndex("test-idx", TimeValue.timeValueSeconds(10)), equalTo(true));
logger.info("--> delete index");
wipeIndex("test-idx");
wipeIndices("test-idx");
logger.info("--> get restore results");
// Now read restore results and make sure it failed
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotResponseFuture.actionGet(TimeValue.timeValueSeconds(10));
@ -469,8 +469,8 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(restoreSnapshotResponse.getRestoreInfo().failedShards()));
logger.info("--> restoring working repository");
putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", repositoryLocation)));
putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", repositoryLocation)).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> trying to restore index again");
@ -478,7 +478,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
ensureGreen();
CountResponse countResponse = run(client.prepareCount("test-idx"));
CountResponse countResponse = client.prepareCount("test-idx").get();
assertThat(countResponse.getCount(), equalTo(100L));
}
@ -488,17 +488,17 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
Client client = client();
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> creating index that cannot be allocated");
run(prepareCreate("test-idx", 2, ImmutableSettings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + ".tag", "nowhere").put("index.number_of_shards", 3)));
prepareCreate("test-idx", 2, ImmutableSettings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + ".tag", "nowhere").put("index.number_of_shards", 3)).get();
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx"));
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0));
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(3));
assertThat(createSnapshotResponse.getSnapshotInfo().shardFailures().size(), equalTo(3));
@ -512,10 +512,10 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
File repo = newTempDir(LifecycleScope.SUITE);
logger.info("--> creating repository at " + repo.getAbsolutePath());
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", repo).put("compress", false).put("chunk_size", atLeast(5))
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx");
@ -529,18 +529,18 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
}
refresh();
logger.info("--> snapshot {}", i);
CreateSnapshotResponse createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-" + i).setWaitForCompletion(true).setIndices("test-idx"));
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-" + i).setWaitForCompletion(true).setIndices("test-idx").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
// Store number of files after each snapshot
numberOfFiles[i] = numberOfFiles(repo);
}
assertThat(run(client.prepareCount("test-idx")).getCount(), equalTo(10L * numberOfSnapshots));
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(10L * numberOfSnapshots));
int numberOfFilesBeforeDeletion = numberOfFiles(repo);
logger.info("--> delete all snapshots except the first one and last one");
for (int i = 1; i < numberOfSnapshots - 1; i++) {
run(client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-" + i));
client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-" + i).get();
}
int numberOfFilesAfterDeletion = numberOfFiles(repo);
@ -548,7 +548,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(numberOfFilesAfterDeletion, lessThan(numberOfFilesBeforeDeletion));
logger.info("--> delete index");
wipeIndex("test-idx");
wipeIndices("test-idx");
logger.info("--> restore index");
String lastSnapshot = "test-snap-" + (numberOfSnapshots - 1);
@ -556,10 +556,10 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(run(client.prepareCount("test-idx")).getCount(), equalTo(10L * numberOfSnapshots));
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(10L * numberOfSnapshots));
logger.info("--> delete the last snapshot");
run(client.admin().cluster().prepareDeleteSnapshot("test-repo", lastSnapshot));
client.admin().cluster().prepareDeleteSnapshot("test-repo", lastSnapshot).get();
logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
assertThat(numberOfFiles(repo), equalTo(numberOfFiles[0]));
}
@ -570,27 +570,27 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
Client client = client();
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx", "test-idx-closed");
ensureGreen();
logger.info("--> closing index test-idx-closed");
run(client.admin().indices().prepareClose("test-idx-closed"));
ClusterStateResponse stateResponse = run(client.admin().cluster().prepareState());
client.admin().indices().prepareClose("test-idx-closed").get();
ClusterStateResponse stateResponse = client.admin().cluster().prepareState().get();
assertThat(stateResponse.getState().metaData().index("test-idx-closed").state(), equalTo(IndexMetaData.State.CLOSE));
assertThat(stateResponse.getState().routingTable().index("test-idx-closed"), nullValue());
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx*"));
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().indices().size(), equalTo(1));
assertThat(createSnapshotResponse.getSnapshotInfo().shardFailures().size(), equalTo(0));
logger.info("--> deleting snapshot");
run(client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap"));
client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").get();
}
@Test
@ -598,10 +598,10 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
Client client = client();
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx-1", "test-idx-2");
@ -613,11 +613,11 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
index("test-idx-2", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(run(client.prepareCount("test-idx-1")).getCount(), equalTo(100L));
assertThat(run(client.prepareCount("test-idx-2")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-1", "test-idx-2"));
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-1", "test-idx-2").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
@ -627,11 +627,11 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(run(client.prepareCount("test-idx-1-copy")).getCount(), equalTo(100L));
assertThat(run(client.prepareCount("test-idx-2-copy")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-1-copy").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2-copy").get().getCount(), equalTo(100L));
logger.info("--> close indices");
run(client.admin().indices().prepareClose("test-idx-1", "test-idx-2-copy"));
client.admin().indices().prepareClose("test-idx-1", "test-idx-2-copy").get();
logger.info("--> restore indices with different names");
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
@ -670,14 +670,14 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
Client client = client();
File repositoryLocation = newTempDir(LifecycleScope.TEST);
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings(
ImmutableSettings.settingsBuilder()
.put("location", repositoryLocation)
.put("random", randomAsciiOfLength(10))
.put("random_data_file_blocking_rate", 0.1)
.put("wait_after_unblock", 200)
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
// Create index on 2 nodes and make sure each node has a primary by setting no replicas
@ -688,15 +688,15 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(run(client.prepareCount("test-idx")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx"));
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
String blockedNode = waitForCompletionOrBlock(cluster().nodesInclude("test-idx"), "test-repo", "test-snap", TimeValue.timeValueSeconds(60));
if (blockedNode != null) {
logger.info("--> move shards away from the node");
ImmutableSettings.Builder excludeSettings = ImmutableSettings.builder().put("index.routing.allocation.exclude._name", blockedNode);
run(client().admin().indices().prepareUpdateSettings("test-idx").setSettings(excludeSettings));
client().admin().indices().prepareUpdateSettings("test-idx").setSettings(excludeSettings).get();
logger.info("--> execution was blocked on node [{}], moving shards away from this node", blockedNode);
unblock("test-repo");
logger.info("--> waiting for completion");
@ -706,18 +706,18 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
} else {
logger.info("--> done without blocks");
}
ImmutableList<SnapshotInfo> snapshotInfos = run(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap")).getSnapshots();
ImmutableList<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfos.get(0).shardFailures().size(), equalTo(0));
logger.info("--> delete index");
wipeIndex("test-idx");
wipeIndices("test-idx");
logger.info("--> replace mock repository with real one at the same location");
putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder().put("location", repositoryLocation)
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> restore index");
@ -725,7 +725,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(run(client.prepareCount("test-idx")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
}
@Test
@ -734,12 +734,12 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
logger.info("--> creating repository");
File repositoryLocation = newTempDir(LifecycleScope.SUITE);
PutRepositoryResponse putRepositoryResponse = run(client.admin().cluster().preparePutRepository("test-repo")
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", repositoryLocation)
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(5, 100))
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createIndex("test-idx");
@ -750,14 +750,14 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(run(client.prepareCount("test-idx")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = run(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx"));
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(run(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap")).getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> delete index");
wipeIndices("test-idx");
@ -766,19 +766,19 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
wipeRepositories("test-repo");
logger.info("--> create read-only URL repository");
putRepositoryResponse = run(client.admin().cluster().preparePutRepository("url-repo")
putRepositoryResponse = client.admin().cluster().preparePutRepository("url-repo")
.setType("url").setSettings(ImmutableSettings.settingsBuilder()
.put("url", repositoryLocation.toURI().toURL())
));
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
logger.info("--> restore index after deletion");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("url-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(run(client.prepareCount("test-idx")).getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
logger.info("--> list available shapshots");
GetSnapshotsResponse getSnapshotsResponse = run(client.admin().cluster().prepareGetSnapshots("url-repo"));
GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("url-repo").get();
assertThat(getSnapshotsResponse.getSnapshots(), notNullValue());
assertThat(getSnapshotsResponse.getSnapshots().size(), equalTo(1));
}

View File

@ -183,7 +183,7 @@ public abstract class AbstractTermVectorTests extends ElasticsearchIntegrationTe
}
protected void createIndexBasedOnFieldSettings(TestFieldSetting[] fieldSettings, int number_of_shards) throws IOException {
wipeIndex("test");
wipeIndices("test");
XContentBuilder mappingBuilder = jsonBuilder();
mappingBuilder.startObject().startObject("type1").startObject("properties");
for (TestFieldSetting field : fieldSettings) {
@ -196,7 +196,7 @@ public abstract class AbstractTermVectorTests extends ElasticsearchIntegrationTe
settings.put("number_of_shards", number_of_shards);
}
mappingBuilder.endObject().endObject().endObject();
run(prepareCreate("test").addMapping("type1", mappingBuilder).setSettings(settings));
prepareCreate("test").addMapping("type1", mappingBuilder).setSettings(settings).get();
ensureYellow();
}

View File

@ -372,7 +372,7 @@ public class GetTermVectorTests extends AbstractTermVectorTests {
continue;
}
TermVectorResponse response = run(request);
TermVectorResponse response = request.get();
Fields luceneTermVectors = getTermVectorsFromLucene(directoryReader, test.doc);
validateResponse(response, luceneTermVectors, test);
} catch (Throwable t) {

View File

@ -41,7 +41,7 @@ public class MultiTermVectorsTests extends AbstractTermVectorTests {
requestBuilder.add(getRequestForConfig(test).request());
}
MultiTermVectorsItemResponse[] responseItems = run(requestBuilder).getResponses();
MultiTermVectorsItemResponse[] responseItems = requestBuilder.get().getResponses();
for (int i = 0; i < testConfigs.length; i++) {
TestConfig test = testConfigs[i];

View File

@ -20,13 +20,9 @@ package org.elasticsearch.test;
import com.carrotsearch.randomizedtesting.SeedUtils;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterators;
import org.apache.lucene.util.AbstractRandomizedTest.IntegrationTests;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@ -43,17 +39,11 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.support.IgnoreIndices;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -61,7 +51,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.merge.policy.*;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.rest.RestStatus;
@ -85,38 +74,98 @@ import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
/**
* This abstract base testcase reuses a cluster instance internally and might
* start an abitrary number of nodes in the background. This class might in the
* future add random configureation options to created indices etc. unless
* unless they are explicitly defined by the test.
* {@link ElasticsearchIntegrationTest} is an abstract base class to run integration
* tests against a JVM private Elasticsearch Cluster. The test class supports 3 different
* cluster scopes.
* <ul>
* <li>{@link Scope#GLOBAL} - uses a cluster shared across test suites. This cluster doesn't allow any modifications to
* the cluster settings and will fail if any persistent cluster settings are applied during tear down.</li>
* <li>{@link Scope#TEST} - uses a new cluster for each individual test method.</li>
* <li>{@link Scope#SUITE} - uses a cluster shared across all test method in the same suite</li>
* </ul>
* <p/>
* The most common test scope it {@link Scope#GLOBAL} which shares a cluster per JVM. This cluster is only set-up once
* and can be used as long as the tests work on a per index basis without changing any cluster wide settings or require
* any specific node configuration. This is the best performing option since it sets up the cluster only once.
* <p/>
* If the tests need specific node settings or change persistent and/or transient cluster settings either {@link Scope#TEST}
* or {@link Scope#SUITE} should be used. To configure a scope for the test cluster the {@link ClusterScope} annotation
* should be used, here is an example:
* <pre>
* @ClusterScope(scope=Scope.TEST)
* public class SomeIntegrationTest extends ElasticsearchIntegrationTest {
* @Test
* public void testMethod() {}
* }
* </pre>
*
* If no {@link ClusterScope} annotation is present on an integration test the default scope it {@link Scope#GLOBAL}
* <p/>
* A test cluster creates a set of nodes in the background before the test starts. The number of nodes in the cluster is
* determined at random and can change across tests. The minimum number of nodes in the shared global cluster is <code>2</code>.
* For other scopes the {@link ClusterScope} allows configuring the initial number of nodes that are created before
* the tests start.
*
* <pre>
* @ClusterScope(scope=Scope.SUITE, numNodes=3)
* public class SomeIntegrationTest extends ElasticsearchIntegrationTest {
* @Test
* public void testMethod() {}
* }
* </pre>
* <p/>
* Note, the {@link ElasticsearchIntegrationTest} uses randomized settings on a cluster and index level. For instance
* each test might use different directory implementation for each test or will return a random client to one of the
* nodes in the cluster for each call to {@link #client()}. Test failures might only be reproducible if the correct
* system properties are passed to the test execution environment.
*
* <p>
* This test wipes all indices before a testcase is executed and uses
* elasticsearch features like allocation filters to ensure an index is
* allocated only on a certain number of nodes. The test doesn't expose explicit
* information about the client or which client is returned, clients might be
* node clients or transport clients and the returned client might be rotated.
* This class supports the following system properties (passed with -Dkey=value to the application)
* <ul>
* <li>-D{@value #TESTS_CLIENT_RATIO} - a double value in the interval [0..1] which defines the ration between node and transport clients used</li>
* <li>-D{@value #TESTS_CLUSTER_SEED} - a random seed used to initialize the clusters random context.
* <li>-D{@value #INDEX_SEED_SETTING} - a random seed used to initialize the index random context.
* </ul>
* </p>
* <p/>
* Tests that need more explict control over the cluster or that need to change
* the cluster state aside of per-index settings should not use this class as a
* baseclass. If your test modifies the cluster state with persistent or
* transient settings the baseclass will raise and error.
*/
@Ignore
@IntegrationTests
public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase {
public static final String INDEX_SEED_SETTING = "index.tests.seed";
/**
* The random seed for the shared test cluster used in the current JVM.
*/
public static final long SHARED_CLUSTER_SEED = clusterSeed();
private static final TestCluster GLOBAL_CLUSTER = new TestCluster(SHARED_CLUSTER_SEED, TestCluster.clusterName("shared", ElasticsearchTestCase.CHILD_VM_ID, SHARED_CLUSTER_SEED));
/**
* Key used to set the transport client ratio via the commandline -D{@value #TESTS_CLIENT_RATIO}
*/
public static final String TESTS_CLIENT_RATIO = "tests.client.ratio";
/**
* Key used to set the shared cluster random seed via the commandline -D{@value #TESTS_CLUSTER_SEED}
*/
public static final String TESTS_CLUSTER_SEED = "tests.cluster_seed";
/**
* Key used to retrieve the index random seed from the index settings on a running node.
* The value of this seed can be used to initialize a random context for a specific index.
* It's set once per test via a generic index template.
*/
public static final String INDEX_SEED_SETTING = "index.tests.seed";
/**
* The current cluster depending on the configured {@link Scope}.
* By default if no {@link ClusterScope} is configured this will hold a reference to the global cluster carried
* on across test suites.
*/
private static TestCluster currentCluster;
private static final double TRANSPORT_CLIENT_RATIO = transportClientRatio();
private static final TestCluster globalCluster = new TestCluster(SHARED_CLUSTER_SEED, TestCluster.clusterName("shared", ElasticsearchTestCase.CHILD_VM_ID, SHARED_CLUSTER_SEED));
private static TestCluster currentCluster;
private static final Map<Class<?>, TestCluster> clusters = new IdentityHashMap<Class<?>, TestCluster>();
@Before
@ -125,7 +174,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
switch (currentClusterScope) {
case GLOBAL:
clearClusters();
currentCluster = globalCluster;
currentCluster = GLOBAL_CLUSTER;
break;
case SUITE:
currentCluster = buildAndPutCluster(currentClusterScope, false);
@ -158,7 +207,9 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
private void clearClusters() throws IOException {
if (!clusters.isEmpty()) {
IOUtils.close(clusters.values());
for(TestCluster cluster : clusters.values()) {
cluster.close();
}
clusters.clear();
}
}
@ -201,9 +252,14 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
public static Client client() {
return cluster().client();
}
/**
* Creates a randomized index template. This template is used to pass in randomized settings on a
* per index basis.
*/
private static void randomIndexTemplate() {
if (cluster().numNodes() > 0) {
// TODO move settings for random directory etc here into the index based randomized settings.
if (cluster().size() > 0) {
client().admin().indices().preparePutTemplate("random_index_template")
.setTemplate("*")
.setOrder(0)
@ -239,18 +295,20 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return cluster();
}
public ImmutableSettings.Builder randomSettingsBuilder() {
// TODO RANDOMIZE
return ImmutableSettings.builder();
/**
* Returns a settings object used in {@link #createIndex(String...)} and {@link #prepareCreate(String)} and friends.
* This method can be overwritten by subclasses to set defaults for the indices that are created by the test.
* By default it returns an empty settings object.
*/
public Settings indexSettings() {
return ImmutableSettings.EMPTY;
}
// TODO Randomize MergePolicyProviderBase.INDEX_COMPOUND_FORMAT [true|false|"true"|"false"|[0..1]| toString([0..1])]
public Settings getSettings() {
return randomSettingsBuilder().build();
}
/**
* Deletes the given indices from the tests cluster. If no index name is passed to this method
* all indices are removed.
*/
public static void wipeIndices(String... names) {
if (cluster().numNodes() > 0) {
if (cluster().size() > 0) {
try {
assertAcked(client().admin().indices().prepareDelete(names));
} catch (IndexMissingException e) {
@ -259,15 +317,12 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
}
}
public static void wipeIndex(String name) {
wipeIndices(name);
}
/**
* Deletes index templates, support wildcard notation.
* If no template name is passed to this method all templates are removed.
*/
public static void wipeTemplates(String... templates) {
if (cluster().numNodes() > 0) {
if (cluster().size() > 0) {
// if nothing is provided, delete all
if (templates.length == 0) {
templates = new String[]{"*"};
@ -282,25 +337,59 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
}
}
public void createIndex(String... names) {
/**
* Creates one or more indices and asserts that the indices are acknowledged. If one of the indices
* already exists this method will fail and wipe all the indices created so far.
*/
public final void createIndex(String... names) {
List<String> created = new ArrayList<String>();
for (String name : names) {
boolean success = false;
try {
assertAcked(prepareCreate(name).setSettings(getSettings()));
continue;
} catch (IndexAlreadyExistsException ex) {
wipeIndex(name);
assertAcked(prepareCreate(name));
created.add(name);
success = true;
} finally {
if (!success) {
wipeIndices(created.toArray(new String[0]));
}
}
assertAcked(prepareCreate(name).setSettings(getSettings()));
}
}
public CreateIndexRequestBuilder prepareCreate(String index, int numNodes) {
/**
* Creates a new {@link CreateIndexRequestBuilder} with the settings obtained from {@link #indexSettings()}.
*/
public final CreateIndexRequestBuilder prepareCreate(String index) {
return client().admin().indices().prepareCreate(index).setSettings(indexSettings());
}
/**
* Creates a new {@link CreateIndexRequestBuilder} with the settings obtained from {@link #indexSettings()}.
* The index that is created with this builder will only be allowed to allocate on the number of nodes passed to this
* method.
* <p>
* This method uses allocation deciders to filter out certain nodes to allocate the created index on. It defines allocation
* rules based on <code>index.routing.allocation.exclude._name</code>.
* </p>
*/
public final CreateIndexRequestBuilder prepareCreate(String index, int numNodes) {
return prepareCreate(index, numNodes, ImmutableSettings.builder());
}
/**
* Creates a new {@link CreateIndexRequestBuilder} with the settings obtained from {@link #indexSettings()}.
* The index that is created with this builder will only be allowed to allocate on the number of nodes passed to this
* method.
* <p>
* This method uses allocation deciders to filter out certain nodes to allocate the created index on. It defines allocation
* rules based on <code>index.routing.allocation.exclude._name</code>.
* </p>
*/
public CreateIndexRequestBuilder prepareCreate(String index, int numNodes, ImmutableSettings.Builder builder) {
cluster().ensureAtLeastNumNodes(numNodes);
Settings settings = getSettings();
Settings settings = indexSettings();
builder.put(settings);
if (numNodes > 0) {
getExcludeSettings(index, numNodes, builder);
@ -314,30 +403,17 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return builder;
}
public Set<String> getExcludeNodes(String index, int num) {
Set<String> nodeExclude = cluster().nodeExclude(index);
Set<String> nodesInclude = cluster().nodesInclude(index);
if (nodesInclude.size() < num) {
Iterator<String> limit = Iterators.limit(nodeExclude.iterator(), num - nodesInclude.size());
while (limit.hasNext()) {
limit.next();
limit.remove();
}
} else {
Iterator<String> limit = Iterators.limit(nodesInclude.iterator(), nodesInclude.size() - num);
while (limit.hasNext()) {
nodeExclude.add(limit.next());
limit.remove();
}
}
return nodeExclude;
}
public void allowNodes(String index, int numNodes) {
cluster().ensureAtLeastNumNodes(numNodes);
/**
* Restricts the given index to be allocated on <code>n</code> nodes using the allocation deciders.
* Yet if the shards can't be allocated on any other node shards for this index will remain allocated on
* more than <code>n</code> nodes.
*/
public void allowNodes(String index, int n) {
assert index != null;
cluster().ensureAtLeastNumNodes(n);
ImmutableSettings.Builder builder = ImmutableSettings.builder();
if (numNodes > 0) {
getExcludeSettings(index, numNodes, builder);
if (n > 0) {
getExcludeSettings(index, n, builder);
}
Settings build = builder.build();
if (!build.getAsMap().isEmpty()) {
@ -345,10 +421,11 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
}
}
public CreateIndexRequestBuilder prepareCreate(String index) {
return client().admin().indices().prepareCreate(index).setSettings(getSettings());
}
/**
* Ensures the cluster has a green state via the cluster health API. This method will also wait for relocations.
* It is useful to ensure that all action on the cluster have finished and all shards that were currently relocating
* are now allocated and started.
*/
public ClusterHealthStatus ensureGreen() {
ClusterHealthResponse actionGet = client().admin().cluster()
.health(Requests.clusterHealthRequest().waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
@ -360,10 +437,17 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return actionGet.getStatus();
}
/**
* Waits for all relocating shards to become active using the cluster health API.
*/
public ClusterHealthStatus waitForRelocation() {
return waitForRelocation(null);
}
/**
* Waits for all relocating shards to become active and the cluster has reached the given health status
* using the cluster health API.
*/
public ClusterHealthStatus waitForRelocation(ClusterHealthStatus status) {
ClusterHealthRequest request = Requests.clusterHealthRequest().waitForRelocatingShards(0);
if (status != null) {
@ -381,6 +465,9 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return actionGet.getStatus();
}
/**
* Ensures the cluster has a yellow state via the cluster health API.
*/
public ClusterHealthStatus ensureYellow() {
ClusterHealthResponse actionGet = client().admin().cluster()
.health(Requests.clusterHealthRequest().waitForRelocatingShards(0).waitForYellowStatus().waitForEvents(Priority.LANGUID)).actionGet();
@ -391,33 +478,61 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return actionGet.getStatus();
}
public static String commaString(Iterable<String> strings) {
return Joiner.on(',').join(strings);
}
// utils
protected IndexResponse index(String index, String type, XContentBuilder source) {
/**
* Syntactic sugar for:
* <pre>
* client().prepareIndex(index, type).setSource(source).execute().actionGet();
* </pre>
*/
protected final IndexResponse index(String index, String type, XContentBuilder source) {
return client().prepareIndex(index, type).setSource(source).execute().actionGet();
}
protected IndexResponse index(String index, String type, String id, Map<String, Object> source) {
/**
* Syntactic sugar for:
* <pre>
* client().prepareIndex(index, type).setSource(source).execute().actionGet();
* </pre>
*/
protected final IndexResponse index(String index, String type, String id, Map<String, Object> source) {
return client().prepareIndex(index, type, id).setSource(source).execute().actionGet();
}
protected GetResponse get(String index, String type, String id) {
/**
* Syntactic sugar for:
* <pre>
* client().prepareGet(index, type, id).execute().actionGet();
* </pre>
*/
protected final GetResponse get(String index, String type, String id) {
return client().prepareGet(index, type, id).execute().actionGet();
}
protected IndexResponse index(String index, String type, String id, XContentBuilder source) {
/**
* Syntactic sugar for:
* <pre>
* return client().prepareIndex(index, type, id).setSource(source).execute().actionGet();
* </pre>
*/
protected final IndexResponse index(String index, String type, String id, XContentBuilder source) {
return client().prepareIndex(index, type, id).setSource(source).execute().actionGet();
}
protected IndexResponse index(String index, String type, String id, Object... source) {
/**
* Syntactic sugar for:
* <pre>
* return client().prepareIndex(index, type, id).setSource(source).execute().actionGet();
* </pre>
*/
protected final IndexResponse index(String index, String type, String id, Object... source) {
return client().prepareIndex(index, type, id).setSource(source).execute().actionGet();
}
protected RefreshResponse refresh() {
/**
* Waits for relocations and refreshes all indices in the cluster.
* @see #waitForRelocation()
*/
protected final RefreshResponse refresh() {
waitForRelocation();
// TODO RANDOMIZE with flush?
RefreshResponse actionGet = client().admin().indices().prepareRefresh().execute().actionGet();
@ -425,16 +540,22 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return actionGet;
}
protected void flushAndRefresh() {
/**
* Flushes and refreshes all indices in the cluster
*/
protected final void flushAndRefresh() {
flush(true);
refresh();
}
protected FlushResponse flush() {
/**
* Flushes all indices in the cluster
*/
protected final FlushResponse flush() {
return flush(true);
}
protected FlushResponse flush(boolean ignoreNotAllowed) {
private FlushResponse flush(boolean ignoreNotAllowed) {
waitForRelocation();
FlushResponse actionGet = client().admin().indices().prepareFlush().execute().actionGet();
if (ignoreNotAllowed) {
@ -447,6 +568,9 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return actionGet;
}
/**
* Waits for all relocations and optimized all indices in the cluster to 1 segment.
*/
protected OptimizeResponse optimize() {
waitForRelocation();
OptimizeResponse actionGet = client().admin().indices().prepareOptimize().execute().actionGet();
@ -454,48 +578,29 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return actionGet;
}
protected Set<String> nodeIdsWithIndex(String... indices) {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
GroupShardsIterator allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
Set<String> nodes = new HashSet<String>();
for (ShardIterator shardIterator : allAssignedShardsGrouped) {
for (ShardRouting routing : shardIterator.asUnordered()) {
if (routing.active()) {
nodes.add(routing.currentNodeId());
}
}
}
return nodes;
}
protected int numAssignedShards(String... indices) {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
GroupShardsIterator allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
return allAssignedShardsGrouped.size();
}
/**
* Returns <code>true</code> iff the given index exists otherwise <code>false</code>
*/
protected boolean indexExists(String index) {
IndicesExistsResponse actionGet = client().admin().indices().prepareExists(index).execute().actionGet();
return actionGet.isExists();
}
/**
* Returns a random admin client. This client can either be a node or a transport client pointing to any of
* the nodes in the cluster.
*/
protected AdminClient admin() {
return client().admin();
}
protected <Res extends ActionResponse> Res run(ActionRequestBuilder<?, Res, ?> builder) {
Res actionGet = builder.execute().actionGet();
return actionGet;
}
protected <Res extends BroadcastOperationResponse> Res run(BroadcastOperationRequestBuilder<?, Res, ?> builder) {
Res actionGet = builder.execute().actionGet();
assertNoFailures(actionGet);
return actionGet;
}
// TODO move this into a base class for integration tests
/**
* Indexes the given {@link IndexRequestBuilder} instances randomly. It shuffles the given builders and either
* indexes they in a blocking or async fashion. This is very useful to catch problems that relate to internal document
* ids or index segment creations. Some features might have bug when a given document is the first or the last in a
* segment or if only one document is in a segment etc. This method prevents issues like this by randomizing the index
* layout.
*/
public void indexRandom(boolean forceRefresh, IndexRequestBuilder... builders) throws InterruptedException, ExecutionException {
if (builders.length == 0) {
return;
@ -573,8 +678,6 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
latches.add(l);
return l;
}
private class LatchedActionListener<Response> implements ActionListener<Response> {
private final CountDownLatch latch;
@ -619,14 +722,34 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
}
/**
* Clears the given scroll Ids
*/
public void clearScroll(String... scrollIds) {
ClearScrollResponse clearResponse = client().prepareClearScroll()
.setScrollIds(Arrays.asList(scrollIds)).get();
assertThat(clearResponse.isSucceeded(), equalTo(true));
}
/**
* The scope of a test cluster used together with
* {@link ClusterScope} annonations on {@link ElasticsearchIntegrationTest} subclasses.
*/
public static enum Scope {
GLOBAL, SUITE, TEST;
/**
* A globally shared cluster. This cluster doesn't allow modification of transient or persistent
* cluster settings.
*/
GLOBAL,
/**
* A cluster shared across all method in a single test suite
*/
SUITE,
/**
* A test exclusive test cluster
*/
TEST;
}
private ClusterScope getAnnotation(Class<?> clazz) {
@ -650,13 +773,19 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
ClusterScope annotation = getAnnotation(this.getClass());
return annotation == null ? -1 : annotation.numNodes();
}
/**
* This method is used to obtain settings for the <tt>Nth</tt> node in the cluster.
* Nodes in this cluster are associated with an ordinal number such that nodes can
* be started with specific configurations. This method might be called multiple
* times with the same ordinal and is expected to return the same value for each invocation.
* In other words subclasses must ensure this method is idempotent.
*/
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.EMPTY;
}
protected TestCluster buildTestCluster(Scope scope) {
private TestCluster buildTestCluster(Scope scope) {
long currentClusterSeed = randomLong();
int numNodes = getNumNodes();
NodeSettingsSource nodeSettingsSource;
@ -677,31 +806,57 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return new TestCluster(currentClusterSeed, numNodes, TestCluster.clusterName(scope.name(), ElasticsearchTestCase.CHILD_VM_ID, currentClusterSeed), nodeSettingsSource);
}
/**
* Defines a cluster scope for a {@link ElasticsearchIntegrationTest} subclass.
* By default if no {@link ClusterScope} annotation is present {@link Scope#GLOBAL} is used
* together with randomly chosen settings like number of nodes etc.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface ClusterScope {
Scope scope() default Scope.GLOBAL;
/**
* Returns the scope. {@link Scope#GLOBAL} is default.
*/
Scope scope() default Scope.GLOBAL;
/**
* Returns the number of nodes in the cluster. Default is <tt>-1</tt> which means
* a random number of nodes but at least <code>2</code></tt> is used./
*/
int numNodes() default -1;
/**
* Returns the transport client ratio. By default this returns <code>-1</code> which means a random
* ratio in the interval <code>[0..1]</code> is used.
*/
double transportClientRatio() default -1;
}
private static long clusterSeed() {
String property = System.getProperty("tests.cluster_seed");
String property = System.getProperty(TESTS_CLUSTER_SEED);
if (property == null || property.isEmpty()) {
return System.nanoTime();
}
return SeedUtils.parseSeed(property);
}
/**
* Returns the client ratio configured via
*/
private static double transportClientRatio() {
String property = System.getProperty("tests.client.ratio");
String property = System.getProperty(TESTS_CLIENT_RATIO);
if (property == null || property.isEmpty()) {
return Double.NaN;
}
return Double.parseDouble(property);
}
/**
* Returns the transport client ratio from the class level annotation or via
* {@link System#getProperty(String)} if available. If both are not available this will
* return a random ratio in the interval <tt>[0..1]</tt>
*/
private double getPerTestTransportClientRatio() {
final ClusterScope annotation = getAnnotation(this.getClass());
double perTestRatio = -1;

View File

@ -27,6 +27,10 @@ import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.TimeUnits;
import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter;
/**
* Base testcase for lucene based testing. This class should be used if low level lucene features are tested.
*/
@Listeners({
ReproduceInfoPrinter.class
})

View File

@ -44,6 +44,9 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
/**
* Base testcase for randomized unit testing with Elasticsearch
*/
@ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class})
@ThreadLeakScope(Scope.NONE)
@TimeoutSuite(millis = TimeUnits.HOUR) // timeout the suite after 1h and fail the test.

View File

@ -21,7 +21,10 @@ package org.elasticsearch.test;
import com.carrotsearch.randomizedtesting.ThreadFilter;
public class ElasticsearchThreadFilter implements ThreadFilter {
/**
* Simple thread filter for randomized runner
*/
public final class ElasticsearchThreadFilter implements ThreadFilter {
@Override
public boolean reject(Thread t) {
return true;

View File

@ -31,6 +31,11 @@ import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter;
@ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class})
@ThreadLeakScope(Scope.NONE)
@TimeoutSuite(millis = TimeUnits.HOUR)
/**
* Basic test case for token streams. the assertion methods in this class will
* run basic checks to enforce correct behavior of the token streams.
*/
public abstract class ElasticsearchTokenStreamTestCase extends BaseTokenStreamTestCase {
}

View File

@ -24,7 +24,7 @@ import org.elasticsearch.common.settings.Settings;
import java.util.Map;
public abstract class NodeSettingsSource {
abstract class NodeSettingsSource {
public static final NodeSettingsSource EMPTY = new NodeSettingsSource() {
@Override

View File

@ -19,11 +19,9 @@
package org.elasticsearch.test;
import com.carrotsearch.randomizedtesting.SeedUtils;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import org.apache.lucene.util.IOUtils;
@ -31,8 +29,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -44,9 +40,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.ImmutableSettings.Builder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.engine.IndexEngineModule;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
@ -67,9 +61,20 @@ import static com.google.common.collect.Maps.newTreeMap;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
public class TestCluster implements Closeable, Iterable<Client> {
/**
* TestCluster manages a set of JVM private nodes and allows convenient access to them.
* The cluster supports randomized configuration such that nodes started in the cluster will
* automatically load asserting services tracking resources like file handles or open searchers.
* <p>
* The Cluster is bound to a test lifecycle where tests must call {@link #beforeTest(java.util.Random, double)} and
* {@link #afterTest()} to initialize and reset the cluster in order to be more reproducible. The term "more" relates
* to the async nature of Elasticsearch in combination with randomized testing. Once Threads and asynchronous calls
* are involved reproducibility is very limited. This class should only be used through {@link ElasticsearchIntegrationTest}.
* </p>
*/
public final class TestCluster implements Iterable<Client> {
protected final ESLogger logger = Loggers.getLogger(getClass());
private final ESLogger logger = Loggers.getLogger(getClass());
/* sorted map to make traverse order reproducible */
private final TreeMap<String, NodeAndClient> nodes = newTreeMap();
@ -98,11 +103,11 @@ public class TestCluster implements Closeable, Iterable<Client> {
private final NodeSettingsSource nodeSettingsSource;
public TestCluster(long clusterSeed, String clusterName) {
TestCluster(long clusterSeed, String clusterName) {
this(clusterSeed, -1, clusterName, NodeSettingsSource.EMPTY);
}
public TestCluster(long clusterSeed, int numNodes, String clusterName, NodeSettingsSource nodeSettingsSource) {
TestCluster(long clusterSeed, int numNodes, String clusterName, NodeSettingsSource nodeSettingsSource) {
this.clusterName = clusterName;
Random random = new Random(clusterSeed);
numSharedNodes = numNodes == -1 ? 2 + random.nextInt(4) : numNodes; // at least 2 nodes if randomized
@ -155,7 +160,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
return builder.build();
}
public static String clusterName(String prefix, String childVMId, long clusterSeed) {
static String clusterName(String prefix, String childVMId, long clusterSeed) {
StringBuilder builder = new StringBuilder(prefix);
builder.append('-').append(NetworkUtils.getLocalAddress().getHostName());
builder.append("-CHILD_VM=[").append(childVMId).append(']');
@ -203,24 +208,34 @@ public class TestCluster implements Closeable, Iterable<Client> {
return null;
}
public synchronized void ensureAtLeastNumNodes(int num) {
/**
* Ensures that at least <code>n</code> nodes are present in the cluster.
* if more nodes than <code>n</code> are present this method will not
* stop any of the running nodes.
*/
public synchronized void ensureAtLeastNumNodes(int n) {
int size = nodes.size();
for (int i = size; i < num; i++) {
logger.info("increasing cluster size from {} to {}", size, num);
for (int i = size; i < n; i++) {
logger.info("increasing cluster size from {} to {}", size, n);
NodeAndClient buildNode = buildNode();
buildNode.node().start();
publishNode(buildNode);
}
}
public synchronized void ensureAtMostNumNodes(int num) {
if (nodes.size() <= num) {
/**
* Ensures that at most <code>n</code> are up and running.
* If less nodes that <code>n</code> are running this method
* will not start any additional nodes.
*/
public synchronized void ensureAtMostNumNodes(int n) {
if (nodes.size() <= n) {
return;
}
// prevent killing the master if possible
final Iterator<NodeAndClient> values = num == 0 ? nodes.values().iterator() : Iterators.filter(nodes.values().iterator(), Predicates.not(new MasterNodePredicate(getMasterName())));
final Iterator<NodeAndClient> limit = Iterators.limit(values, nodes.size() - num);
logger.info("reducing cluster size from {} to {}", nodes.size() - num, num);
final Iterator<NodeAndClient> values = n == 0 ? nodes.values().iterator() : Iterators.filter(nodes.values().iterator(), Predicates.not(new MasterNodePredicate(getMasterName())));
final Iterator<NodeAndClient> limit = Iterators.limit(values, nodes.size() - n);
logger.info("reducing cluster size from {} to {}", nodes.size() - n, n);
Set<NodeAndClient> nodesToRemove = new HashSet<NodeAndClient>();
while (limit.hasNext()) {
NodeAndClient next = limit.next();
@ -260,12 +275,16 @@ public class TestCluster implements Closeable, Iterable<Client> {
return "node_" + id;
}
public synchronized Client client() {
synchronized Client client() {
ensureOpen();
/* Randomly return a client to one of the nodes in the cluster */
return getOrBuildRandomNode().client(random);
}
/**
* Returns a node client to the current master node.
* Note: use this with care tests should not rely on a certain nodes client.
*/
public synchronized Client masterClient() {
ensureOpen();
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new MasterNodePredicate(getMasterName()));
@ -276,6 +295,9 @@ public class TestCluster implements Closeable, Iterable<Client> {
return null; // can't happen
}
/**
* Returns a node client to random node but not the master. This method will fail if no non-master client is available.
*/
public synchronized Client nonMasterClient() {
ensureOpen();
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName())));
@ -286,6 +308,9 @@ public class TestCluster implements Closeable, Iterable<Client> {
return null; // can't happen
}
/**
* Returns a client to a node started with "node.client: true"
*/
public synchronized Client clientNodeClient() {
ensureOpen();
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new ClientNodePredicate());
@ -296,15 +321,23 @@ public class TestCluster implements Closeable, Iterable<Client> {
return getRandomNodeAndClient(new ClientNodePredicate()).client(random);
}
public synchronized Client clientNodeClient(String nodeName) {
/**
* Returns a node client to a given node.
*/
public synchronized Client client(String nodeName) {
ensureOpen();
NodeAndClient randomNodeAndClient = nodes.get(nodeName);
if (randomNodeAndClient != null) {
return randomNodeAndClient.client(random);
NodeAndClient nodeAndClient = nodes.get(nodeName);
if (nodeAndClient != null) {
return nodeAndClient.client(random);
}
return null;
Assert.fail("No node found with name: [" + nodeName + "]");
return null; // can't happen
}
/**
* Returns a "smart" node client to a random node in the cluster
*/
public synchronized Client smartClient() {
NodeAndClient randomNodeAndClient = getRandomNodeAndClient();
if (randomNodeAndClient != null) {
@ -314,6 +347,11 @@ public class TestCluster implements Closeable, Iterable<Client> {
return null; // can't happen
}
/**
* Returns a random node that applies to the given predicate.
* The predicate can filter nodes based on the nodes settings.
* If all nodes are filtered out this method will return <code>null</code>
*/
public synchronized Client client(final Predicate<Settings> filterPredicate) {
ensureOpen();
final NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new Predicate<NodeAndClient>() {
@ -328,7 +366,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
return null;
}
public void close() {
void close() {
ensureOpen();
if (this.open.compareAndSet(true, false)) {
IOUtils.closeWhileHandlingException(nodes.values());
@ -336,17 +374,6 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
}
public synchronized ImmutableSet<ClusterBlock> waitForNoBlocks(TimeValue timeout, Node node) throws InterruptedException {
ensureOpen();
long start = System.currentTimeMillis();
ImmutableSet<ClusterBlock> blocks;
do {
blocks = node.client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState().blocks()
.global(ClusterBlockLevel.METADATA);
} while (!blocks.isEmpty() && (System.currentTimeMillis() - start) < timeout.millis());
return blocks;
}
private final class NodeAndClient implements Closeable {
private InternalNode node;
private Client client;
@ -439,14 +466,14 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
}
public static class ClientFactory {
static class ClientFactory {
public Client client(Node node, String clusterName, Random random) {
return node.client();
}
}
public static class TransportClientFactory extends ClientFactory {
static class TransportClientFactory extends ClientFactory {
private boolean sniff;
public static TransportClientFactory NO_SNIFF_CLIENT_FACTORY = new TransportClientFactory(false);
@ -467,7 +494,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
}
public class RandomClientFactory extends ClientFactory {
class RandomClientFactory extends ClientFactory {
@Override
public Client client(Node node, String clusterName, Random random) {
@ -486,7 +513,10 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
}
public synchronized void beforeTest(Random random, double transportClientRatio) {
/**
* This method should be exectued before each test to reset the cluster to it's initial state.
*/
synchronized void beforeTest(Random random, double transportClientRatio) {
reset(random, true, transportClientRatio);
}
@ -521,7 +551,7 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
if (!changed && sharedNodes.size() == nodes.size()) {
logger.debug("Cluster is consistent - moving out - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), sharedNodesSeeds.length);
if (numNodes() > 0) {
if (size() > 0) {
client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(sharedNodesSeeds.length)).get();
}
return; // we are consistent - return
@ -541,14 +571,17 @@ public class TestCluster implements Closeable, Iterable<Client> {
publishNode(nodeAndClient);
}
nextNodeId.set(sharedNodesSeeds.length);
assert numNodes() == sharedNodesSeeds.length;
if (numNodes() > 0) {
assert size() == sharedNodesSeeds.length;
if (size() > 0) {
client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(sharedNodesSeeds.length)).get();
}
logger.debug("Cluster is consistent again - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), sharedNodesSeeds.length);
}
public synchronized void afterTest() {
/**
* This method should be executed during tearDown
*/
synchronized void afterTest() {
wipeDataDirectories();
resetClients(); /* reset all clients - each test gets it's own client based on the Random instance created above. */
@ -572,10 +605,16 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
}
/**
* Returns a reference to a random nodes {@link ClusterService}
*/
public synchronized ClusterService clusterService() {
return getInstance(ClusterService.class);
}
/**
* Returns an Iterabel to all instances for the given class &gt;T&lt; across all nodes in the cluster.
*/
public synchronized <T> Iterable<T> getInstances(Class<T> clazz) {
List<T> instances = new ArrayList<T>(nodes.size());
for (NodeAndClient nodeAndClient : nodes.values()) {
@ -584,6 +623,9 @@ public class TestCluster implements Closeable, Iterable<Client> {
return instances;
}
/**
* Returns a reference to the given nodes instances of the given class &gt;T&lt;
*/
public synchronized <T> T getInstance(Class<T> clazz, final String node) {
final Predicate<TestCluster.NodeAndClient> predicate;
if (node != null) {
@ -600,6 +642,9 @@ public class TestCluster implements Closeable, Iterable<Client> {
return getInstanceFromNode(clazz, randomNodeAndClient.node);
}
/**
* Returns a reference to a random nodes instances of the given class &gt;T&lt;
*/
public synchronized <T> T getInstance(Class<T> clazz) {
return getInstance(clazz, null);
}
@ -608,10 +653,16 @@ public class TestCluster implements Closeable, Iterable<Client> {
return node.injector().getInstance(clazz);
}
public synchronized int numNodes() {
/**
* Returns the number of nodes in the cluster.
*/
public synchronized int size() {
return this.nodes.size();
}
/**
* Stops a random node in the cluster.
*/
public synchronized void stopRandomNode() {
ensureOpen();
NodeAndClient nodeAndClient = getRandomNodeAndClient();
@ -622,6 +673,10 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
}
/**
* Stops a random node in the cluster that applies to the given filter or non if the non of the nodes applies to the
* filter.
*/
public synchronized void stopRandomNode(final Predicate<Settings> filter) {
ensureOpen();
NodeAndClient nodeAndClient = getRandomNodeAndClient(new Predicate<TestCluster.NodeAndClient>() {
@ -638,9 +693,12 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
/**
* Stops the current master node forcefully
*/
public synchronized void stopCurrentMasterNode() {
ensureOpen();
assert numNodes() > 0;
assert size() > 0;
String masterNodeName = getMasterName();
assert nodes.containsKey(masterNodeName);
logger.info("Closing master node [{}] ", masterNodeName);
@ -648,6 +706,9 @@ public class TestCluster implements Closeable, Iterable<Client> {
remove.close();
}
/**
* Stops the any of the current nodes but not the master node.
*/
public void stopRandomNonMasterNode() {
NodeAndClient nodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName())));
if (nodeAndClient != null) {
@ -656,11 +717,18 @@ public class TestCluster implements Closeable, Iterable<Client> {
nodeAndClient.close();
}
}
/**
* Restarts a random node in the cluster
*/
public void restartRandomNode() throws Exception {
restartRandomNode(EMPTY_CALLBACK);
}
/**
* Restarts a random node in the cluster and calls the callback during restart.
*/
public void restartRandomNode(RestartCallback callback) throws Exception {
ensureOpen();
NodeAndClient nodeAndClient = getRandomNodeAndClient();
@ -707,24 +775,38 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
}
}
public static final RestartCallback EMPTY_CALLBACK = new RestartCallback() {
private static final RestartCallback EMPTY_CALLBACK = new RestartCallback() {
public Settings onNodeStopped(String node) {
return null;
}
};
/**
* Restarts all nodes in the cluster. It first stops all nodes and then restarts all the nodes again.
*/
public void fullRestart() throws Exception {
fullRestart(EMPTY_CALLBACK);
}
/**
* Restarts all nodes in a rolling restart fashion ie. only restarts on node a time.
*/
public void rollingRestart() throws Exception {
rollingRestart(EMPTY_CALLBACK);
}
/**
* Restarts all nodes in a rolling restart fashion ie. only restarts on node a time.
*/
public void rollingRestart(RestartCallback function) throws Exception {
restartAllNodes(true, function);
}
/**
* Restarts all nodes in the cluster. It first stops all nodes and then restarts all the nodes again.
*/
public void fullRestart(RestartCallback function) throws Exception {
restartAllNodes(false, function);
}
@ -740,12 +822,12 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
}
public synchronized Set<String> allButN(int numNodes) {
return nRandomNodes(numNodes() - numNodes);
synchronized Set<String> allButN(int numNodes) {
return nRandomNodes(size() - numNodes);
}
public synchronized Set<String> nRandomNodes(int numNodes) {
assert numNodes() >= numNodes;
private synchronized Set<String> nRandomNodes(int numNodes) {
assert size() >= numNodes;
return Sets.newHashSet(Iterators.limit(this.nodes.keySet().iterator(), numNodes));
}
@ -754,6 +836,9 @@ public class TestCluster implements Closeable, Iterable<Client> {
startNode(settingsBuilder().put(settings).put("node.client", true));
}
/**
* Returns a set of nodes that have at least one shard of the given index.
*/
public synchronized Set<String> nodesInclude(String index) {
if (clusterService().state().routingTable().hasIndex(index)) {
List<ShardRouting> allShards = clusterService().state().routingTable().allShards(index);
@ -770,31 +855,23 @@ public class TestCluster implements Closeable, Iterable<Client> {
return Collections.emptySet();
}
public synchronized Set<String> nodeExclude(String index) {
final Set<String> nodesInclude = nodesInclude(index);
return Sets.newHashSet(Iterators.transform(Iterators.filter(nodes.values().iterator(), new Predicate<NodeAndClient>() {
@Override
public boolean apply(NodeAndClient nodeAndClient) {
return !nodesInclude.contains(nodeAndClient.name);
}
}), new Function<NodeAndClient, String>() {
@Override
public String apply(NodeAndClient nodeAndClient) {
return nodeAndClient.name;
}
}));
}
/**
* Starts a node with default settings and returns it's name.
*/
public String startNode() {
return startNode(ImmutableSettings.EMPTY);
}
/**
* Starts a node with the given settings builder and returns it's name.
*/
public String startNode(Settings.Builder settings) {
return startNode(settings.build());
}
/**
* Starts a node with the given settings and returns it's name.
*/
public String startNode(Settings settings) {
NodeAndClient buildNode = buildNode(settings);
buildNode.node().start();
@ -812,13 +889,6 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
public void resetAllGateways() throws Exception {
Collection<NodeAndClient> values = this.nodes.values();
for (NodeAndClient nodeAndClient : values) {
getInstanceFromNode(Gateway.class, ((InternalNode) nodeAndClient.node)).reset();
}
}
public void closeNonSharedNodes(boolean wipeData) {
reset(random, wipeData, transportClientRatio);
}
@ -868,11 +938,14 @@ public class TestCluster implements Closeable, Iterable<Client> {
};
}
/**
* Returns a predicate that only accepts settings of nodes with one of the given names.
*/
public static Predicate<Settings> nameFilter(String... nodeName) {
return new NodeNamePredicate(new HashSet<String>(Arrays.asList(nodeName)));
}
private static final class NodeNamePredicate implements Predicate<Settings> {
private final HashSet<String> nodeNames;
@ -887,24 +960,45 @@ public class TestCluster implements Closeable, Iterable<Client> {
}
}
/**
* An abstract class that is called during {@link #rollingRestart(org.elasticsearch.test.TestCluster.RestartCallback)}
* and / or {@link #fullRestart(org.elasticsearch.test.TestCluster.RestartCallback)} to execute actions at certain
* stages of the restart.
*/
public static abstract class RestartCallback {
/**
* Executed once the give node name has been stopped.
*/
public Settings onNodeStopped(String nodeName) throws Exception {
return ImmutableSettings.EMPTY;
}
public void doAfterNodes(int numNodes, Client client) throws Exception {
/**
* Executed for each node before the <tt>n+1</tt> node is restarted. The given client is
* an active client to the node that will be restarted next.
*/
public void doAfterNodes(int n, Client client) throws Exception {
}
/**
* If this returns <code>true</code> all data for the node with the given node name will be cleared including
* gateways and all index data. Returns <code>false</code> by default.
*/
public boolean clearData(String nodeName) {
return false;
}
/**
* If this returns <code>false</code> the node with the given node name will not be restarted. It will be
* closed and removed from the cluster. Returns <code>true</code> by default.
*/
public boolean doRestart(String nodeName) {
return true;
}
}
}

View File

@ -225,39 +225,39 @@ public class UpdateTests extends ElasticsearchIntegrationTest {
assertThrows(client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v2'").setVersion(2).execute(),
VersionConflictEngineException.class);
run(client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v2'").setVersion(1));
assertThat(run(client().prepareGet("test", "type", "1")).getVersion(), equalTo(2l));
client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v2'").setVersion(1).get();
assertThat(client().prepareGet("test", "type", "1").get().getVersion(), equalTo(2l));
// and again with a higher version..
run(client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v3'").setVersion(2));
client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v3'").setVersion(2).get();
assertThat(run(client().prepareGet("test", "type", "1")).getVersion(), equalTo(3l));
assertThat(client().prepareGet("test", "type", "1").get().getVersion(), equalTo(3l));
// after delete
run(client().prepareDelete("test", "type", "1"));
client().prepareDelete("test", "type", "1").get();
assertThrows(client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v2'").setVersion(3).execute(),
DocumentMissingException.class);
// external versioning
run(client().prepareIndex("test", "type", "2").setSource("text", "value").setVersion(10).setVersionType(VersionType.EXTERNAL));
client().prepareIndex("test", "type", "2").setSource("text", "value").setVersion(10).setVersionType(VersionType.EXTERNAL).get();
assertThrows(client().prepareUpdate("test", "type", "2").setScript("ctx._source.text = 'v2'").setVersion(2).setVersionType(VersionType.EXTERNAL).execute(),
VersionConflictEngineException.class);
run(client().prepareUpdate("test", "type", "2").setScript("ctx._source.text = 'v2'").setVersion(11).setVersionType(VersionType.EXTERNAL));
client().prepareUpdate("test", "type", "2").setScript("ctx._source.text = 'v2'").setVersion(11).setVersionType(VersionType.EXTERNAL).get();
assertThat(run(client().prepareGet("test", "type", "2")).getVersion(), equalTo(11l));
assertThat(client().prepareGet("test", "type", "2").get().getVersion(), equalTo(11l));
// upserts - the combination with versions is a bit weird. Test are here to ensure we do not change our behavior unintentionally
// With internal versions, tt means "if object is there with version X, update it or explode. If it is not there, index.
run(client().prepareUpdate("test", "type", "3").setScript("ctx._source.text = 'v2'").setVersion(10).setUpsert("{ \"text\": \"v0\" }"));
client().prepareUpdate("test", "type", "3").setScript("ctx._source.text = 'v2'").setVersion(10).setUpsert("{ \"text\": \"v0\" }").get();
GetResponse get = get("test", "type", "3");
assertThat(get.getVersion(), equalTo(1l));
assertThat((String) get.getSource().get("text"), equalTo("v0"));
// With external versions, it means - if object is there with version lower than X, update it or explode. If it is not there, insert with new version.
run(client().prepareUpdate("test", "type", "4").setScript("ctx._source.text = 'v2'").
setVersion(10).setVersionType(VersionType.EXTERNAL).setUpsert("{ \"text\": \"v0\" }"));
client().prepareUpdate("test", "type", "4").setScript("ctx._source.text = 'v2'").
setVersion(10).setVersionType(VersionType.EXTERNAL).setUpsert("{ \"text\": \"v0\" }").get();
get = get("test", "type", "4");
assertThat(get.getVersion(), equalTo(10l));
assertThat((String) get.getSource().get("text"), equalTo("v0"));

View File

@ -23,7 +23,7 @@ public class ConcurrentDocumentOperationTests extends ElasticsearchIntegrationTe
logger.info("--> create an index with 1 shard and max replicas based on nodes");
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder().put("index.number_of_shards", 1).put("index.number_of_replicas", cluster().numNodes()-1))
.setSettings(settingsBuilder().put("index.number_of_shards", 1).put("index.number_of_replicas", cluster().size()-1))
.execute().actionGet();
logger.info("execute concurrent updates on the same doc");
@ -54,7 +54,7 @@ public class ConcurrentDocumentOperationTests extends ElasticsearchIntegrationTe
logger.info("done indexing, check all have the same field value");
Map masterSource = client().prepareGet("test", "type1", "1").execute().actionGet().getSourceAsMap();
for (int i = 0; i < (cluster().numNodes() * 5); i++) {
for (int i = 0; i < (cluster().size() * 5); i++) {
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().getSourceAsMap(), equalTo(masterSource));
}
}