Introduce dedicated master nodes in testing infrastructure (#18514)

This PR changes the InternalTestCluster to support dedicated master nodes. The creation of dedicated master nodes can be controlled using a new `supportsMasterNodes` parameter to the ClusterScope annotation. If set to true (the default), dedicated master nodes will randomly be used. If set to false,  no master nodes will be created and data nodes will also be allowed to become masters. If active, test runs will either have 1 or 3 masternodes
This commit is contained in:
Boaz Leskes 2016-05-27 08:44:20 +02:00
parent fb763c1e8e
commit 318a4e3ef6
37 changed files with 348 additions and 239 deletions

View File

@ -371,7 +371,7 @@ public class TasksIT extends ESIntegTestCase {
logger.info("--> started test tasks"); logger.info("--> started test tasks");
// Wait for the task to start on all nodes // Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().numDataAndMasterNodes(), assertBusy(() -> assertEquals(internalCluster().size(),
client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size())); client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
logger.info("--> cancelling the main test task"); logger.info("--> cancelling the main test task");
@ -391,7 +391,7 @@ public class TasksIT extends ESIntegTestCase {
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()) ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
.execute(); .execute();
// Wait for the task to start on all nodes // Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().numDataAndMasterNodes(), assertBusy(() -> assertEquals(internalCluster().size(),
client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size())); client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
TestTaskPlugin.UnblockTestTasksAction.INSTANCE.newRequestBuilder(client()).get(); TestTaskPlugin.UnblockTestTasksAction.INSTANCE.newRequestBuilder(client()).get();
@ -409,7 +409,7 @@ public class TasksIT extends ESIntegTestCase {
ListenableActionFuture<ListTasksResponse> waitResponseFuture; ListenableActionFuture<ListTasksResponse> waitResponseFuture;
try { try {
// Wait for the task to start on all nodes // Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().numDataAndMasterNodes(), client().admin().cluster().prepareListTasks() assertBusy(() -> assertEquals(internalCluster().size(), client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size())); .setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
// Spin up a request to wait for that task to finish // Spin up a request to wait for that task to finish
@ -437,7 +437,7 @@ public class TasksIT extends ESIntegTestCase {
.execute(); .execute();
try { try {
// Wait for the task to start on all nodes // Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().numDataAndMasterNodes(), client().admin().cluster().prepareListTasks() assertBusy(() -> assertEquals(internalCluster().size(), client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size())); .setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
// Spin up a request that should wait for those tasks to finish // Spin up a request that should wait for those tasks to finish

View File

@ -276,7 +276,7 @@ public class TestTaskPlugin extends Plugin {
protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) { protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) {
List<String> list = new ArrayList<>(); List<String> list = new ArrayList<>();
for (String node : nodesIds) { for (String node : nodesIds) {
if (nodes.getDataNodes().containsKey(node)) { if (nodes.nodeExists(node)) {
list.add(node); list.add(node);
} }
} }

View File

@ -27,7 +27,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@ -37,12 +36,12 @@ import org.hamcrest.Matchers;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0) @ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class ClusterStatsIT extends ESIntegTestCase { public class ClusterStatsIT extends ESIntegTestCase {
private void assertCounts(ClusterStatsNodes.Counts counts, int total, Map<String, Integer> roles) { private void assertCounts(ClusterStatsNodes.Counts counts, int total, Map<String, Integer> roles) {
@ -58,12 +57,14 @@ public class ClusterStatsIT extends ESIntegTestCase {
public void testNodeCounts() { public void testNodeCounts() {
int total = 1; int total = 1;
internalCluster().startNode();
Map<String, Integer> expectedCounts = new HashMap<>(); Map<String, Integer> expectedCounts = new HashMap<>();
expectedCounts.put(DiscoveryNode.Role.DATA.getRoleName(), 1); expectedCounts.put(DiscoveryNode.Role.DATA.getRoleName(), 1);
expectedCounts.put(DiscoveryNode.Role.MASTER.getRoleName(), 1); expectedCounts.put(DiscoveryNode.Role.MASTER.getRoleName(), 1);
expectedCounts.put(DiscoveryNode.Role.INGEST.getRoleName(), 1); expectedCounts.put(DiscoveryNode.Role.INGEST.getRoleName(), 1);
expectedCounts.put(ClusterStatsNodes.Counts.COORDINATING_ONLY, 0); expectedCounts.put(ClusterStatsNodes.Counts.COORDINATING_ONLY, 0);
int numNodes = randomIntBetween(1, 5); int numNodes = randomIntBetween(1, 5);
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get(); ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
assertCounts(response.getNodesStats().getCounts(), total, expectedCounts); assertCounts(response.getNodesStats().getCounts(), total, expectedCounts);
@ -112,7 +113,9 @@ public class ClusterStatsIT extends ESIntegTestCase {
assertThat(stats.getReplication(), Matchers.equalTo(replicationFactor)); assertThat(stats.getReplication(), Matchers.equalTo(replicationFactor));
} }
public void testIndicesShardStats() { public void testIndicesShardStats() throws ExecutionException, InterruptedException {
internalCluster().startNode();
ensureGreen();
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get(); ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN)); assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
@ -155,10 +158,8 @@ public class ClusterStatsIT extends ESIntegTestCase {
} }
public void testValuesSmokeScreen() throws IOException { public void testValuesSmokeScreen() throws IOException, ExecutionException, InterruptedException {
internalCluster().ensureAtMostNumDataNodes(5); internalCluster().startNodesAsync(randomIntBetween(1, 3)).get();
internalCluster().ensureAtLeastNumDataNodes(1);
assertAcked(prepareCreate("test1").setSettings(Settings.builder().put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), 0).build()));
index("test1", "type", "1", "f", "f"); index("test1", "type", "1", "f", "f");
/* /*
* Ensure at least one shard is allocated otherwise the FS stats might * Ensure at least one shard is allocated otherwise the FS stats might
@ -188,9 +189,6 @@ public class ClusterStatsIT extends ESIntegTestCase {
} }
public void testAllocatedProcessors() throws Exception { public void testAllocatedProcessors() throws Exception {
// stop all other nodes
internalCluster().ensureAtMostNumDataNodes(0);
// start one node with 7 processors. // start one node with 7 processors.
internalCluster().startNodesAsync(Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 7).build()).get(); internalCluster().startNodesAsync(Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 7).build()).get();
waitForNodes(1); waitForNodes(1);
@ -200,14 +198,15 @@ public class ClusterStatsIT extends ESIntegTestCase {
} }
public void testClusterStatusWhenStateNotRecovered() throws Exception { public void testClusterStatusWhenStateNotRecovered() throws Exception {
// stop all other nodes internalCluster().startMasterOnlyNode(Settings.builder().put("gateway.recover_after_nodes", 2).build());
internalCluster().ensureAtMostNumDataNodes(0);
internalCluster().startNode(Settings.builder().put("gateway.recover_after_nodes", 2).build());
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get(); ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED)); assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));
internalCluster().ensureAtLeastNumDataNodes(3); if (randomBoolean()) {
internalCluster().startMasterOnlyNode(Settings.EMPTY);
} else {
internalCluster().startDataOnlyNode(Settings.EMPTY);
}
// wait for the cluster status to settle // wait for the cluster status to settle
ensureGreen(); ensureGreen();
response = client().admin().cluster().prepareClusterStats().get(); response = client().admin().cluster().prepareClusterStats().get();

View File

@ -29,7 +29,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
@ -41,7 +40,7 @@ import java.util.concurrent.ExecutionException;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ClusterScope(scope = Scope.TEST, numClientNodes = 0) @ClusterScope(scope = Scope.TEST, numClientNodes = 0, supportsDedicatedMasters = false)
public class TransportClientRetryIT extends ESIntegTestCase { public class TransportClientRetryIT extends ESIntegTestCase {
public void testRetry() throws IOException, ExecutionException, InterruptedException { public void testRetry() throws IOException, ExecutionException, InterruptedException {
Iterable<TransportService> instances = internalCluster().getInstances(TransportService.class); Iterable<TransportService> instances = internalCluster().getInstances(TransportService.class);

View File

@ -37,7 +37,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
@ClusterScope(scope = SUITE, numDataNodes = 1) @ClusterScope(scope = SUITE, supportsDedicatedMasters = false, numDataNodes = 1)
public class SettingsFilteringIT extends ESIntegTestCase { public class SettingsFilteringIT extends ESIntegTestCase {
@Override @Override

View File

@ -43,7 +43,7 @@ public class ClusterSearchShardsIT extends ESIntegTestCase {
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
switch(nodeOrdinal) { switch(nodeOrdinal % 2) {
case 1: case 1:
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("node.attr.tag", "B").build(); return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("node.attr.tag", "B").build();
case 0: case 0:

View File

@ -41,7 +41,7 @@ import static org.hamcrest.Matchers.hasSize;
/** /**
* *
*/ */
@ClusterScope(scope = Scope.TEST, numDataNodes = 1) @ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class NettyHttpRequestSizeLimitIT extends ESIntegTestCase { public class NettyHttpRequestSizeLimitIT extends ESIntegTestCase {
private static final ByteSizeValue LIMIT = new ByteSizeValue(2, ByteSizeUnit.KB); private static final ByteSizeValue LIMIT = new ByteSizeValue(2, ByteSizeUnit.KB);

View File

@ -40,7 +40,7 @@ import static org.hamcrest.Matchers.hasSize;
/** /**
* *
*/ */
@ClusterScope(scope = Scope.TEST, numDataNodes = 1) @ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class NettyPipeliningDisabledIT extends ESIntegTestCase { public class NettyPipeliningDisabledIT extends ESIntegTestCase {
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {

View File

@ -36,7 +36,7 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ClusterScope(scope = Scope.TEST, numDataNodes = 1) @ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class NettyPipeliningEnabledIT extends ESIntegTestCase { public class NettyPipeliningEnabledIT extends ESIntegTestCase {
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {

View File

@ -34,7 +34,7 @@ import java.util.Collections;
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ClusterScope(scope = SUITE, numDataNodes = 1, numClientNodes = 0) @ClusterScope(scope = SUITE, supportsDedicatedMasters = false, numDataNodes = 1, numClientNodes = 0)
public class SettingsListenerIT extends ESIntegTestCase { public class SettingsListenerIT extends ESIntegTestCase {
@Override @Override
@ -109,13 +109,13 @@ public class SettingsListenerIT extends ESIntegTestCase {
.put("index.test.new.setting", 21) .put("index.test.new.setting", 21)
.build()).get()); .build()).get());
for (SettingsTestingService instance : internalCluster().getInstances(SettingsTestingService.class)) { for (SettingsTestingService instance : internalCluster().getDataNodeInstances(SettingsTestingService.class)) {
assertEquals(21, instance.value); assertEquals(21, instance.value);
} }
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
.put("index.test.new.setting", 42)).get(); .put("index.test.new.setting", 42)).get();
for (SettingsTestingService instance : internalCluster().getInstances(SettingsTestingService.class)) { for (SettingsTestingService instance : internalCluster().getDataNodeInstances(SettingsTestingService.class)) {
assertEquals(42, instance.value); assertEquals(42, instance.value);
} }
@ -123,14 +123,14 @@ public class SettingsListenerIT extends ESIntegTestCase {
.put("index.test.new.setting", 21) .put("index.test.new.setting", 21)
.build()).get()); .build()).get());
for (SettingsTestingService instance : internalCluster().getInstances(SettingsTestingService.class)) { for (SettingsTestingService instance : internalCluster().getDataNodeInstances(SettingsTestingService.class)) {
assertEquals(42, instance.value); assertEquals(42, instance.value);
} }
client().admin().indices().prepareUpdateSettings("other").setSettings(Settings.builder() client().admin().indices().prepareUpdateSettings("other").setSettings(Settings.builder()
.put("index.test.new.setting", 84)).get(); .put("index.test.new.setting", 84)).get();
for (SettingsTestingService instance : internalCluster().getInstances(SettingsTestingService.class)) { for (SettingsTestingService instance : internalCluster().getDataNodeInstances(SettingsTestingService.class)) {
assertEquals(42, instance.value); assertEquals(42, instance.value);
} }

View File

@ -37,7 +37,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
@ClusterScope(numDataNodes = 1, scope = Scope.SUITE) @ClusterScope(supportsDedicatedMasters = false, numDataNodes = 1, scope = Scope.SUITE)
public class InternalEngineMergeIT extends ESIntegTestCase { public class InternalEngineMergeIT extends ESIntegTestCase {
@TestLogging("_root:DEBUG") @TestLogging("_root:DEBUG")

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.index.store; package org.elasticsearch.index.store;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFileNames;
@ -26,6 +27,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
@ -48,6 +50,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.PrimaryShardAllocator; import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
@ -65,7 +68,6 @@ import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockIndexEventListener; import org.elasticsearch.test.MockIndexEventListener;
import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
@ -84,9 +86,7 @@ import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -113,13 +113,13 @@ public class CorruptedFileIT extends ESIntegTestCase {
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder() return Settings.builder()
// we really need local GW here since this also checks for corruption etc. // we really need local GW here since this also checks for corruption etc.
// and we need to make sure primaries are not just trashed if we don't have replicas // and we need to make sure primaries are not just trashed if we don't have replicas
.put(super.nodeSettings(nodeOrdinal)) .put(super.nodeSettings(nodeOrdinal))
// speed up recoveries // speed up recoveries
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 5) .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 5)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 5) .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 5)
.build(); .build();
} }
@Override @Override
@ -142,11 +142,11 @@ public class CorruptedFileIT extends ESIntegTestCase {
assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(3)); assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(3));
assertAcked(prepareCreate("test").setSettings(Settings.builder() assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on purpose .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on purpose
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
)); ));
ensureGreen(); ensureGreen();
disableAllocation("test"); disableAllocation("test");
@ -171,9 +171,9 @@ public class CorruptedFileIT extends ESIntegTestCase {
Settings build = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "2").build(); Settings build = Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "2").build();
client().admin().indices().prepareUpdateSettings("test").setSettings(build).get(); client().admin().indices().prepareUpdateSettings("test").setSettings(build).get();
ClusterHealthResponse health = client().admin().cluster() ClusterHealthResponse health = client().admin().cluster()
.health(Requests.clusterHealthRequest("test").waitForGreenStatus() .health(Requests.clusterHealthRequest("test").waitForGreenStatus()
.timeout("5m") // sometimes due to cluster rebalacing and random settings default timeout is just not enough. .timeout("5m") // sometimes due to cluster rebalacing and random settings default timeout is just not enough.
.waitForRelocatingShards(0)).actionGet(); .waitForRelocatingShards(0)).actionGet();
if (health.isTimedOut()) { if (health.isTimedOut()) {
logger.info("cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); logger.info("cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
assertThat("timed out waiting for green state", health.isTimedOut(), equalTo(false)); assertThat("timed out waiting for green state", health.isTimedOut(), equalTo(false));
@ -247,10 +247,10 @@ public class CorruptedFileIT extends ESIntegTestCase {
internalCluster().ensureAtLeastNumDataNodes(2); internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("test").setSettings(Settings.builder() assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on purpose .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on purpose
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
)); ));
ensureGreen(); ensureGreen();
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
@ -273,12 +273,12 @@ public class CorruptedFileIT extends ESIntegTestCase {
client().admin().cluster().prepareReroute().get(); client().admin().cluster().prepareReroute().get();
boolean didClusterTurnRed = awaitBusy(() -> { boolean didClusterTurnRed = awaitBusy(() -> {
ClusterHealthStatus test = client().admin().cluster() ClusterHealthStatus test = client().admin().cluster()
.health(Requests.clusterHealthRequest("test")).actionGet().getStatus(); .health(Requests.clusterHealthRequest("test")).actionGet().getStatus();
return test == ClusterHealthStatus.RED; return test == ClusterHealthStatus.RED;
}, 5, TimeUnit.MINUTES);// sometimes on slow nodes the replication / recovery is just dead slow }, 5, TimeUnit.MINUTES);// sometimes on slow nodes the replication / recovery is just dead slow
final ClusterHealthResponse response = client().admin().cluster() final ClusterHealthResponse response = client().admin().cluster()
.health(Requests.clusterHealthRequest("test")).get(); .health(Requests.clusterHealthRequest("test")).get();
if (response.getStatus() != ClusterHealthStatus.RED) { if (response.getStatus() != ClusterHealthStatus.RED) {
logger.info("Cluster turned red in busy loop: {}", didClusterTurnRed); logger.info("Cluster turned red in busy loop: {}", didClusterTurnRed);
logger.info("cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); logger.info("cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
@ -328,10 +328,10 @@ public class CorruptedFileIT extends ESIntegTestCase {
NodeStats primariesNode = dataNodeStats.get(0); NodeStats primariesNode = dataNodeStats.get(0);
NodeStats unluckyNode = dataNodeStats.get(1); NodeStats unluckyNode = dataNodeStats.get(1);
assertAcked(prepareCreate("test").setSettings(Settings.builder() assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put("index.routing.allocation.include._name", primariesNode.getNode().getName()) .put("index.routing.allocation.include._name", primariesNode.getNode().getName())
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE) .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)
)); ));
ensureGreen(); // allocated with empty commit ensureGreen(); // allocated with empty commit
@ -356,8 +356,8 @@ public class CorruptedFileIT extends ESIntegTestCase {
} }
Settings build = Settings.builder() Settings build = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
.put("index.routing.allocation.include._name", primariesNode.getNode().getName() + "," + unluckyNode.getNode().getName()).build(); .put("index.routing.allocation.include._name", primariesNode.getNode().getName() + "," + unluckyNode.getNode().getName()).build();
client().admin().indices().prepareUpdateSettings("test").setSettings(build).get(); client().admin().indices().prepareUpdateSettings("test").setSettings(build).get();
client().admin().cluster().prepareReroute().get(); client().admin().cluster().prepareReroute().get();
hasCorrupted.await(); hasCorrupted.await();
@ -390,12 +390,12 @@ public class CorruptedFileIT extends ESIntegTestCase {
assertAcked(prepareCreate("test").setSettings(Settings.builder() assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, between(1, 4)) // don't go crazy here it must recovery fast .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, between(1, 4)) // don't go crazy here it must recovery fast
// This does corrupt files on the replica, so we can't check: // This does corrupt files on the replica, so we can't check:
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
.put("index.routing.allocation.include._name", primariesNode.getNode().getName()) .put("index.routing.allocation.include._name", primariesNode.getNode().getName())
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE) .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)
)); ));
ensureGreen(); ensureGreen();
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
@ -432,12 +432,12 @@ public class CorruptedFileIT extends ESIntegTestCase {
} }
Settings build = Settings.builder() Settings build = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1") .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
.put("index.routing.allocation.include._name", "*").build(); .put("index.routing.allocation.include._name", "*").build();
client().admin().indices().prepareUpdateSettings("test").setSettings(build).get(); client().admin().indices().prepareUpdateSettings("test").setSettings(build).get();
client().admin().cluster().prepareReroute().get(); client().admin().cluster().prepareReroute().get();
ClusterHealthResponse actionGet = client().admin().cluster() ClusterHealthResponse actionGet = client().admin().cluster()
.health(Requests.clusterHealthRequest("test").waitForGreenStatus()).actionGet(); .health(Requests.clusterHealthRequest("test").waitForGreenStatus()).actionGet();
if (actionGet.isTimedOut()) { if (actionGet.isTimedOut()) {
logger.info("ensureGreen timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); logger.info("ensureGreen timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false)); assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false));
@ -525,11 +525,12 @@ public class CorruptedFileIT extends ESIntegTestCase {
internalCluster().ensureAtLeastNumDataNodes(2); internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("test").setSettings(Settings.builder() assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING.getKey(), "one") .put(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING.getKey(), "one")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on purpose .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on purpose
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
)); ));
ensureGreen(); ensureGreen();
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
@ -543,22 +544,41 @@ public class CorruptedFileIT extends ESIntegTestCase {
SearchResponse countResponse = client().prepareSearch().setSize(0).get(); SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs); assertHitCount(countResponse, numDocs);
final Map<String, List<Path>> filesToCorrupt = findFilesToCorruptForReplica(); // disable allocations of replicas post restart (the restart will change replicas to primaries, so we have
internalCluster().fullRestart(new InternalTestCluster.RestartCallback() { // to capture replicas post restart)
@Override assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(
public Settings onNodeStopped(String nodeName) throws Exception { Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")
List<Path> paths = filesToCorrupt.get(nodeName); ));
if (paths != null) {
for (Path path : paths) { internalCluster().fullRestart();
ensureYellow();
final Index index = resolveIndex("test");
final IndicesShardStoresResponse stores = client().admin().indices().prepareShardStores(index.getName()).get();
for (IntObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> shards :
stores.getStoreStatuses().get(index.getName())) {
for (IndicesShardStoresResponse.StoreStatus store : shards.value) {
final ShardId shardId = new ShardId(index, shards.key);
if (store.getAllocationStatus().equals(IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED)) {
for (Path path : findFilesToCorruptOnNode(store.getNode().getName(), shardId)) {
try (OutputStream os = Files.newOutputStream(path)) { try (OutputStream os = Files.newOutputStream(path)) {
os.write(0); os.write(0);
} }
logger.info("corrupting file {} on node {}", path, nodeName); logger.info("corrupting file {} on node {}", path, store.getNode().getName());
} }
} }
return null;
} }
}); }
// enable allocation
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(
Settings.builder().putNull(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey())
));
ensureGreen(); ensureGreen();
} }
@ -568,34 +588,21 @@ public class CorruptedFileIT extends ESIntegTestCase {
return shardIterators.size(); return shardIterators.size();
} }
private Map<String, List<Path>> findFilesToCorruptForReplica() throws IOException { private List<Path> findFilesToCorruptOnNode(final String nodeName, final ShardId shardId) throws IOException {
Map<String, List<Path>> filesToNodes = new HashMap<>(); List<Path> files = new ArrayList<>();
ClusterState state = client().admin().cluster().prepareState().get().getState(); for (Path path : internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPaths(shardId)) {
Index test = state.metaData().index("test").getIndex(); path = path.resolve("index");
for (ShardRouting shardRouting : state.getRoutingTable().allShards("test")) { if (Files.exists(path)) { // multi data path might only have one path in use
if (shardRouting.primary()) { try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
continue; for (Path item : stream) {
} if (item.getFileName().toString().startsWith("segments_")) {
assertTrue(shardRouting.assignedToNode()); files.add(item);
NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(shardRouting.currentNodeId()).setFs(true).get();
NodeStats nodeStats = nodeStatses.getNodes().get(0);
List<Path> files = new ArrayList<>();
filesToNodes.put(nodeStats.getNode().getName(), files);
for (FsInfo.Path info : nodeStats.getFs()) {
String path = info.getPath();
Path file = PathUtils.get(path).resolve("indices").resolve(test.getUUID()).resolve(Integer.toString(shardRouting.getId())).resolve("index");
if (Files.exists(file)) { // multi data path might only have one path in use
try (DirectoryStream<Path> stream = Files.newDirectoryStream(file)) {
for (Path item : stream) {
if (item.getFileName().toString().startsWith("segments_")) {
files.add(item);
}
} }
} }
} }
} }
} }
return filesToNodes; return files;
} }
private ShardRouting corruptRandomPrimaryFile() throws IOException { private ShardRouting corruptRandomPrimaryFile() throws IOException {

View File

@ -39,16 +39,16 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.query.QueryCacheStats; import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.IndicesRequestCache; import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@ -103,7 +103,7 @@ public class IndexStatsIT extends ESIntegTestCase {
client().prepareIndex("test", "type", "2").setSource("field", "value2", "field2", "value2").execute().actionGet(); client().prepareIndex("test", "type", "2").setSource("field", "value2", "field2", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet(); client().admin().indices().prepareRefresh().execute().actionGet();
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0L)); assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0L));
@ -112,7 +112,7 @@ public class IndexStatsIT extends ESIntegTestCase {
client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet(); client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet();
client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet(); client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L));
indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0L));
@ -122,7 +122,7 @@ public class IndexStatsIT extends ESIntegTestCase {
client().prepareSearch().addSort("field2", SortOrder.ASC).execute().actionGet(); client().prepareSearch().addSort("field2", SortOrder.ASC).execute().actionGet();
// now check the per field stats // now check the per field stats
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.FieldData, true).fieldDataFields("*")).execute().actionGet(); nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.FieldData, true).fieldDataFields("*")).execute().actionGet();
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes().get(1).getIndices().getFieldData().getFields().get("field"), greaterThan(0L)); assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes().get(1).getIndices().getFieldData().getFields().get("field"), greaterThan(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes().get(1).getIndices().getFieldData().getFields().get("field"), lessThan(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes())); assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes().get(1).getIndices().getFieldData().getFields().get("field"), lessThan(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes()));
@ -133,7 +133,7 @@ public class IndexStatsIT extends ESIntegTestCase {
assertThat(indicesStats.getTotal().getFieldData().getFields().get("field"), lessThan(indicesStats.getTotal().getFieldData().getMemorySizeInBytes())); assertThat(indicesStats.getTotal().getFieldData().getFields().get("field"), lessThan(indicesStats.getTotal().getFieldData().getMemorySizeInBytes()));
client().admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet(); client().admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet();
assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0L)); assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0L));
@ -150,7 +150,7 @@ public class IndexStatsIT extends ESIntegTestCase {
client().prepareIndex("test", "type", "2").setSource("field", "value2").execute().actionGet(); client().prepareIndex("test", "type", "2").setSource("field", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet(); client().admin().indices().prepareRefresh().execute().actionGet();
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true) NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true)
.execute().actionGet(); .execute().actionGet();
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L)); assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L));
@ -171,7 +171,7 @@ public class IndexStatsIT extends ESIntegTestCase {
.addSort("field", SortOrder.ASC) .addSort("field", SortOrder.ASC)
.execute().actionGet(); .execute().actionGet();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true) nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true)
.execute().actionGet(); .execute().actionGet();
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), greaterThan(0L)); assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), greaterThan(0L));
@ -184,7 +184,7 @@ public class IndexStatsIT extends ESIntegTestCase {
client().admin().indices().prepareClearCache().execute().actionGet(); client().admin().indices().prepareClearCache().execute().actionGet();
Thread.sleep(100); // Make sure the filter cache entries have been removed... Thread.sleep(100); // Make sure the filter cache entries have been removed...
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true) nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true)
.execute().actionGet(); .execute().actionGet();
assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L)); assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L));

View File

@ -36,7 +36,7 @@ import static org.hamcrest.Matchers.is;
/** /**
* Tests that when disabling detailed errors, a request with the error_trace parameter returns a HTTP 400 * Tests that when disabling detailed errors, a request with the error_trace parameter returns a HTTP 400
*/ */
@ClusterScope(scope = Scope.TEST, numDataNodes = 1) @ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class DetailedErrorsDisabledIT extends ESIntegTestCase { public class DetailedErrorsDisabledIT extends ESIntegTestCase {
// Build our cluster settings // Build our cluster settings
@Override @Override

View File

@ -23,7 +23,6 @@ import org.apache.http.impl.client.HttpClients;
import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
@ -37,7 +36,7 @@ import static org.hamcrest.Matchers.not;
/** /**
* Tests that by default the error_trace parameter can be used to show stacktraces * Tests that by default the error_trace parameter can be used to show stacktraces
*/ */
@ClusterScope(scope = Scope.TEST, numDataNodes = 1) @ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class DetailedErrorsEnabledIT extends ESIntegTestCase { public class DetailedErrorsEnabledIT extends ESIntegTestCase {
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {

View File

@ -35,7 +35,7 @@ import static org.hamcrest.Matchers.equalTo;
/** /**
* Test a rest action that sets special response headers * Test a rest action that sets special response headers
*/ */
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1) @ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1)
public class ResponseHeaderPluginIT extends ESIntegTestCase { public class ResponseHeaderPluginIT extends ESIntegTestCase {
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
@ -60,4 +60,4 @@ public class ResponseHeaderPluginIT extends ESIntegTestCase {
assertThat(authResponse, hasStatus(OK)); assertThat(authResponse, hasStatus(OK));
assertThat(authResponse.getHeaders().get("Secret"), equalTo("granted")); assertThat(authResponse.getHeaders().get("Secret"), equalTo("granted"));
} }
} }

View File

@ -32,7 +32,7 @@ import static org.hamcrest.Matchers.not;
/** /**
* *
*/ */
@ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 1) @ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1)
public class CorsNotSetIT extends ESIntegTestCase { public class CorsNotSetIT extends ESIntegTestCase {
@Override @Override

View File

@ -39,7 +39,7 @@ import static org.hamcrest.Matchers.not;
/** /**
* Test CORS where the allow origin value is a regular expression. * Test CORS where the allow origin value is a regular expression.
*/ */
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1) @ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1)
public class CorsRegexIT extends ESIntegTestCase { public class CorsRegexIT extends ESIntegTestCase {
protected static final ESLogger logger = Loggers.getLogger(CorsRegexIT.class); protected static final ESLogger logger = Loggers.getLogger(CorsRegexIT.class);

View File

@ -58,7 +58,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
/** /**
* *
*/ */
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1) @ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1)
public class FetchSubPhasePluginIT extends ESIntegTestCase { public class FetchSubPhasePluginIT extends ESIntegTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> nodePlugins() { protected Collection<Class<? extends Plugin>> nodePlugins() {

View File

@ -56,7 +56,7 @@ import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1) @ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1)
public class ExplainableScriptIT extends ESIntegTestCase { public class ExplainableScriptIT extends ESIntegTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> nodePlugins() { protected Collection<Class<? extends Plugin>> nodePlugins() {

View File

@ -53,7 +53,7 @@ import static org.hamcrest.Matchers.equalTo;
/** /**
* *
*/ */
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1) @ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1)
public class FunctionScorePluginIT extends ESIntegTestCase { public class FunctionScorePluginIT extends ESIntegTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> nodePlugins() { protected Collection<Class<? extends Plugin>> nodePlugins() {

View File

@ -37,7 +37,7 @@ import static org.hamcrest.Matchers.equalTo;
/** /**
* *
*/ */
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1) @ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1)
public class CustomHighlighterSearchIT extends ESIntegTestCase { public class CustomHighlighterSearchIT extends ESIntegTestCase {
@Override @Override

View File

@ -71,7 +71,8 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
public static long getFailureCount(String repository) { public static long getFailureCount(String repository) {
long failureCount = 0; long failureCount = 0;
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { for (RepositoriesService repositoriesService :
internalCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) {
MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository); MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
failureCount += mockRepository.getFailureCount(); failureCount += mockRepository.getFailureCount();
} }

View File

@ -563,6 +563,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
} }
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.info("--> caught a top level exception, asserting what's expected", ex);
assertThat(getFailureCount("test-repo"), greaterThan(0L)); assertThat(getFailureCount("test-repo"), greaterThan(0L));
assertThat(ExceptionsHelper.detailedMessage(ex), containsString("IOException")); assertThat(ExceptionsHelper.detailedMessage(ex), containsString("IOException"));
} }

View File

@ -53,7 +53,7 @@ import static org.hamcrest.Matchers.is;
/** /**
* *
*/ */
@ClusterScope(scope = Scope.TEST, numDataNodes = 1) @ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class NettyTransportIT extends ESIntegTestCase { public class NettyTransportIT extends ESIntegTestCase {
// static so we can use it in anonymous classes // static so we can use it in anonymous classes
private static String channelProfileName = null; private static String channelProfileName = null;

View File

@ -47,7 +47,7 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0) @ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1, numClientNodes = 0)
public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase { public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase {
private static int randomPort = -1; private static int randomPort = -1;

View File

@ -94,7 +94,7 @@ public class TribeIT extends ESIntegTestCase {
} }
}; };
cluster2 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), randomLong(), createTempDir(), 2, 2, cluster2 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), randomLong(), createTempDir(), true, 2, 2,
UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, Collections.emptyList(), Function.identity()); UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, Collections.emptyList(), Function.identity());
cluster2.beforeTest(random(), 0.1); cluster2.beforeTest(random(), 0.1);

View File

@ -49,7 +49,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
@ClusterScope(scope= Scope.SUITE, numDataNodes = 1) @ClusterScope(scope= Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1)
public class SimpleTTLIT extends ESIntegTestCase { public class SimpleTTLIT extends ESIntegTestCase {
static private final long PURGE_INTERVAL = 200; static private final long PURGE_INTERVAL = 200;

View File

@ -148,13 +148,19 @@ You can use the `@ClusterScope` annotation at class level to configure this beha
[source,java] [source,java]
----------------------------------------- -----------------------------------------
@ClusterScope(scope=TEST, numNodes=1) @ClusterScope(scope=TEST, numDataNodes=1)
public class CustomSuggesterSearchTests extends ESIntegTestCase { public class CustomSuggesterSearchTests extends ESIntegTestCase {
// ... tests go here // ... tests go here
} }
----------------------------------------- -----------------------------------------
The above sample configures the test to use a new cluster for each test method. The default scope is `SUITE` (one cluster for all test methods in the test). The `numNodes` settings allows you to only start a certain number of nodes, which can speed up test execution, as starting a new node is a costly and time consuming operation and might not be needed for this test. The above sample configures the test to use a new cluster for each test method. The default scope is `SUITE` (one cluster for all
test methods in the test). The `numDataNodes` settings allows you to only start a certain number of data nodes, which can speed up test
execution, as starting a new node is a costly and time consuming operation and might not be needed for this test.
By default, the testing infrastructure will randomly start dedicated master nodes. If you want to disable dedicated masters
you can set `supportsDedicatedMasters=false` in a similar fashion to the `numDataNodes` setting. If dedicated master nodes are not used,
data nodes will be allowed to become masters as well.
[[changing-node-configuration]] [[changing-node-configuration]]

View File

@ -277,9 +277,9 @@ public class AzureDiscoveryClusterFormationTests extends ESIntegTestCase {
public void testJoin() throws ExecutionException, InterruptedException { public void testJoin() throws ExecutionException, InterruptedException {
// only wait for the cluster to form // only wait for the cluster to form
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(2)).get()); ensureClusterSizeConsistency();
// add one more node and wait for it to join // add one more node and wait for it to join
internalCluster().startDataOnlyNodeAsync().get(); internalCluster().startDataOnlyNodeAsync().get();
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(3)).get()); ensureClusterSizeConsistency();
} }
} }

View File

@ -57,7 +57,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTi
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.SuppressLocalMode @ESIntegTestCase.SuppressLocalMode
@ESIntegTestCase.ClusterScope(numDataNodes = 2, numClientNodes = 0) @ESIntegTestCase.ClusterScope(supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
@SuppressForbidden(reason = "use http server") @SuppressForbidden(reason = "use http server")
// TODO this should be a IT but currently all ITs in this project run against a real cluster // TODO this should be a IT but currently all ITs in this project run against a real cluster
public class Ec2DiscoveryClusterFormationTests extends ESIntegTestCase { public class Ec2DiscoveryClusterFormationTests extends ESIntegTestCase {

View File

@ -57,7 +57,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTi
@ESIntegTestCase.SuppressLocalMode @ESIntegTestCase.SuppressLocalMode
@ESIntegTestCase.ClusterScope(numDataNodes = 2, numClientNodes = 0) @ESIntegTestCase.ClusterScope(supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
@SuppressForbidden(reason = "use http server") @SuppressForbidden(reason = "use http server")
// TODO this should be a IT but currently all ITs in this project run against a real cluster // TODO this should be a IT but currently all ITs in this project run against a real cluster
public class GceDiscoverTests extends ESIntegTestCase { public class GceDiscoverTests extends ESIntegTestCase {

View File

@ -36,6 +36,7 @@ import static org.hamcrest.Matchers.greaterThan;
@ESIntegTestCase.ClusterScope( @ESIntegTestCase.ClusterScope(
scope = ESIntegTestCase.Scope.SUITE, scope = ESIntegTestCase.Scope.SUITE,
supportsDedicatedMasters = false,
numDataNodes = 1, numDataNodes = 1,
numClientNodes = 0, numClientNodes = 0,
transportClientRatio = 0.0) transportClientRatio = 0.0)

View File

@ -62,7 +62,7 @@ import static org.hamcrest.Matchers.greaterThan;
*/ */
@ClusterScope( @ClusterScope(
scope = ESIntegTestCase.Scope.SUITE, scope = ESIntegTestCase.Scope.SUITE,
numDataNodes = 1, supportsDedicatedMasters = false, numDataNodes = 1,
transportClientRatio = 0.0) transportClientRatio = 0.0)
public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyIntegTestCase { public class AzureSnapshotRestoreTests extends AbstractAzureWithThirdPartyIntegTestCase {
private String getRepositoryPath() { private String getRepositoryPath() {

View File

@ -108,8 +108,8 @@ import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.IndicesRequestCache; import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.node.NodeMocksPlugin; import org.elasticsearch.node.NodeMocksPlugin;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
@ -1026,7 +1026,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
logger.info("memory: {}", XContentHelper.toString(client().admin().cluster().prepareNodesStats().clear().setJvm(true).get())); logger.info("memory: {}", XContentHelper.toString(client().admin().cluster().prepareNodesStats().clear().setJvm(true).get()));
} }
void ensureClusterSizeConsistency() { protected void ensureClusterSizeConsistency() {
if (cluster() != null) { // if static init fails the cluster can be null if (cluster() != null) { // if static init fails the cluster can be null
logger.trace("Check consistency for [{}] nodes", cluster().size()); logger.trace("Check consistency for [{}] nodes", cluster().size());
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(cluster().size())).get()); assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(cluster().size())).get());
@ -1478,17 +1478,24 @@ public abstract class ESIntegTestCase extends ESTestCase {
int numDataNodes() default -1; int numDataNodes() default -1;
/** /**
* Returns the minimum number of nodes in the cluster. Default is <tt>-1</tt>. * Returns the minimum number of data nodes in the cluster. Default is <tt>-1</tt>.
* Ignored when {@link ClusterScope#numDataNodes()} is set. * Ignored when {@link ClusterScope#numDataNodes()} is set.
*/ */
int minNumDataNodes() default -1; int minNumDataNodes() default -1;
/** /**
* Returns the maximum number of nodes in the cluster. Default is <tt>-1</tt>. * Returns the maximum number of data nodes in the cluster. Default is <tt>-1</tt>.
* Ignored when {@link ClusterScope#numDataNodes()} is set. * Ignored when {@link ClusterScope#numDataNodes()} is set.
*/ */
int maxNumDataNodes() default -1; int maxNumDataNodes() default -1;
/**
* Indicates whether the cluster can have dedicated master nodes. If <tt>false</tt> means data nodes will serve as master nodes
* and there will be no dedicated master (and data) nodes. Default is <tt>true</tt> which means
* dedicated master nodes will be randomly used.
*/
boolean supportsDedicatedMasters() default true;
/** /**
* Returns the number of client nodes in the cluster. Default is {@link InternalTestCluster#DEFAULT_NUM_CLIENT_NODES}, a * Returns the number of client nodes in the cluster. Default is {@link InternalTestCluster#DEFAULT_NUM_CLIENT_NODES}, a
* negative value means that the number of client nodes will be randomized. * negative value means that the number of client nodes will be randomized.
@ -1571,7 +1578,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
return getAnnotation(clazz.getSuperclass(), annotationClass); return getAnnotation(clazz.getSuperclass(), annotationClass);
} }
private Scope getCurrentClusterScope() { private Scope getCurrentClusterScope() {
return getCurrentClusterScope(this.getClass()); return getCurrentClusterScope(this.getClass());
} }
@ -1582,6 +1588,11 @@ public abstract class ESIntegTestCase extends ESTestCase {
return annotation == null ? Scope.SUITE : annotation.scope(); return annotation == null ? Scope.SUITE : annotation.scope();
} }
private boolean getSupportsDedicatedMasters() {
ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
return annotation == null ? true : annotation.supportsDedicatedMasters();
}
private int getNumDataNodes() { private int getNumDataNodes() {
ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class); ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
return annotation == null ? -1 : annotation.numDataNodes(); return annotation == null ? -1 : annotation.numDataNodes();
@ -1717,6 +1728,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
} }
}; };
boolean supportsDedicatedMasters = getSupportsDedicatedMasters();
int numDataNodes = getNumDataNodes(); int numDataNodes = getNumDataNodes();
int minNumDataNodes; int minNumDataNodes;
int maxNumDataNodes; int maxNumDataNodes;
@ -1739,7 +1751,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
Collection<Class<? extends Plugin>> mockPlugins = getMockPlugins(); Collection<Class<? extends Plugin>> mockPlugins = getMockPlugins();
return new InternalTestCluster(nodeMode, seed, createTempDir(), minNumDataNodes, maxNumDataNodes, return new InternalTestCluster(nodeMode, seed, createTempDir(), supportsDedicatedMasters, minNumDataNodes, maxNumDataNodes,
InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(), InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(),
InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper()); InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper());
} }

View File

@ -161,7 +161,9 @@ public final class InternalTestCluster extends TestCluster {
private static final int JVM_ORDINAL = Integer.parseInt(System.getProperty(SysGlobals.CHILDVM_SYSPROP_JVM_ID, "0")); private static final int JVM_ORDINAL = Integer.parseInt(System.getProperty(SysGlobals.CHILDVM_SYSPROP_JVM_ID, "0"));
/** a per-JVM unique offset to be used for calculating unique port ranges. */ /**
* a per-JVM unique offset to be used for calculating unique port ranges.
*/
public static final int JVM_BASE_PORT_OFFSET = PORTS_PER_JVM * (JVM_ORDINAL + 1); public static final int JVM_BASE_PORT_OFFSET = PORTS_PER_JVM * (JVM_ORDINAL + 1);
private static final AtomicInteger clusterOrdinal = new AtomicInteger(); private static final AtomicInteger clusterOrdinal = new AtomicInteger();
@ -171,6 +173,9 @@ public final class InternalTestCluster extends TestCluster {
public final int HTTP_BASE_PORT = GLOBAL_HTTP_BASE_PORT + CLUSTER_BASE_PORT_OFFSET; public final int HTTP_BASE_PORT = GLOBAL_HTTP_BASE_PORT + CLUSTER_BASE_PORT_OFFSET;
static final int DEFAULT_LOW_NUM_MASTER_NODES = 1;
static final int DEFAULT_HIGH_NUM_MASTER_NODES = 3;
static final int DEFAULT_MIN_NUM_DATA_NODES = 1; static final int DEFAULT_MIN_NUM_DATA_NODES = 1;
static final int DEFAULT_MAX_NUM_DATA_NODES = TEST_NIGHTLY ? 6 : 3; static final int DEFAULT_MAX_NUM_DATA_NODES = TEST_NIGHTLY ? 6 : 3;
@ -198,9 +203,13 @@ public final class InternalTestCluster extends TestCluster {
* fully shared cluster to be more reproducible */ * fully shared cluster to be more reproducible */
private final long[] sharedNodesSeeds; private final long[] sharedNodesSeeds;
private final int numSharedAllRolesNodes;
private final int numShareCoordOnlyNodes; // if set to 0, data nodes will also assume the master role
private final int numSharedDedicatedMasterNodes;
private final int numSharedDataNodes;
private final int numSharedCoordOnlyNodes;
private final NodeConfigurationSource nodeConfigurationSource; private final NodeConfigurationSource nodeConfigurationSource;
@ -219,6 +228,7 @@ public final class InternalTestCluster extends TestCluster {
private Function<Client, Client> clientWrapper; private Function<Client, Client> clientWrapper;
public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir, public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir,
boolean randomlyAddDedicatedMasters,
int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes, int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes,
boolean enableHttpPipelining, String nodePrefix, Collection<Class<? extends Plugin>> mockPlugins, Function<Client, Client> clientWrapper) { boolean enableHttpPipelining, String nodePrefix, Collection<Class<? extends Plugin>> mockPlugins, Function<Client, Client> clientWrapper) {
super(clusterSeed); super(clusterSeed);
@ -239,39 +249,47 @@ public final class InternalTestCluster extends TestCluster {
Random random = new Random(clusterSeed); Random random = new Random(clusterSeed);
this.numSharedAllRolesNodes = RandomInts.randomIntBetween(random, minNumDataNodes, maxNumDataNodes); boolean useDedicatedMasterNodes = randomlyAddDedicatedMasters ? random.nextBoolean() : false;
assert this.numSharedAllRolesNodes >= 0;
//for now all shared data nodes are also master eligible this.numSharedDataNodes = RandomInts.randomIntBetween(random, minNumDataNodes, maxNumDataNodes);
if (numSharedAllRolesNodes == 0) { assert this.numSharedDataNodes >= 0;
this.numShareCoordOnlyNodes = 0;
if (numSharedDataNodes == 0) {
this.numSharedCoordOnlyNodes = 0;
this.numSharedDedicatedMasterNodes = 0;
} else { } else {
if (numClientNodes < 0) { if (useDedicatedMasterNodes) {
this.numShareCoordOnlyNodes = RandomInts.randomIntBetween(random, DEFAULT_MIN_NUM_CLIENT_NODES, DEFAULT_MAX_NUM_CLIENT_NODES); if (random.nextBoolean()) {
// use a dedicated master, but only low number to reduce overhead to tests
this.numSharedDedicatedMasterNodes = DEFAULT_LOW_NUM_MASTER_NODES;
} else {
this.numSharedDedicatedMasterNodes = DEFAULT_HIGH_NUM_MASTER_NODES;
}
} else { } else {
this.numShareCoordOnlyNodes = numClientNodes; this.numSharedDedicatedMasterNodes = 0;
}
if (numClientNodes < 0) {
this.numSharedCoordOnlyNodes = RandomInts.randomIntBetween(random, DEFAULT_MIN_NUM_CLIENT_NODES, DEFAULT_MAX_NUM_CLIENT_NODES);
} else {
this.numSharedCoordOnlyNodes = numClientNodes;
} }
} }
assert this.numShareCoordOnlyNodes >= 0; assert this.numSharedCoordOnlyNodes >= 0;
this.nodePrefix = nodePrefix; this.nodePrefix = nodePrefix;
assert nodePrefix != null; assert nodePrefix != null;
this.mockPlugins = mockPlugins; this.mockPlugins = mockPlugins;
/* sharedNodesSeeds = new long[numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes];
* TODO
* - we might want start some master only nodes?
* - we could add a flag that returns a client to the master all the time?
* - we could add a flag that never returns a client to the master
* - along those lines use a dedicated node that is master eligible and let all other nodes be only data nodes
*/
sharedNodesSeeds = new long[numSharedAllRolesNodes + numShareCoordOnlyNodes];
for (int i = 0; i < sharedNodesSeeds.length; i++) { for (int i = 0; i < sharedNodesSeeds.length; i++) {
sharedNodesSeeds[i] = random.nextLong(); sharedNodesSeeds[i] = random.nextLong();
} }
logger.info("Setup InternalTestCluster [{}] with seed [{}] using [{}] data nodes and [{}] client nodes", clusterName, SeedUtils.formatSeed(clusterSeed), numSharedAllRolesNodes, numShareCoordOnlyNodes); logger.info("Setup InternalTestCluster [{}] with seed [{}] using [{}] dedicated masters, " +
"[{}] (data) nodes and [{}] coord only nodes",
clusterName, SeedUtils.formatSeed(clusterSeed),
numSharedDedicatedMasterNodes, numSharedDataNodes, numSharedCoordOnlyNodes);
this.nodeConfigurationSource = nodeConfigurationSource; this.nodeConfigurationSource = nodeConfigurationSource;
Builder builder = Settings.builder(); Builder builder = Settings.builder();
if (random.nextInt(5) == 0) { // sometimes set this if (random.nextInt(5) == 0) { // sometimes set this
@ -348,7 +366,16 @@ public final class InternalTestCluster extends TestCluster {
private Settings getSettings(int nodeOrdinal, long nodeSeed, Settings others) { private Settings getSettings(int nodeOrdinal, long nodeSeed, Settings others) {
Builder builder = Settings.builder().put(defaultSettings) Builder builder = Settings.builder().put(defaultSettings)
.put(getRandomNodeSettings(nodeSeed)); .put(getRandomNodeSettings(nodeSeed));
Settings interimSettings = builder.build();
final String dataSuffix = getRoleSuffix(interimSettings);
if (dataSuffix.isEmpty() == false) {
// to make sure that a master node will not pick up on the data folder of a data only node
// once restarted we append the role suffix to each path.
String[] dataPath = Environment.PATH_DATA_SETTING.get(interimSettings).stream()
.map(path -> path + dataSuffix).toArray(String[]::new);
builder.putArray(Environment.PATH_DATA_SETTING.getKey(), dataPath);
}
Settings settings = nodeConfigurationSource.nodeSettings(nodeOrdinal); Settings settings = nodeConfigurationSource.nodeSettings(nodeOrdinal);
if (settings != null) { if (settings != null) {
if (settings.get(ClusterName.CLUSTER_NAME_SETTING.getKey()) != null) { if (settings.get(ClusterName.CLUSTER_NAME_SETTING.getKey()) != null) {
@ -506,7 +533,11 @@ public final class InternalTestCluster extends TestCluster {
int size = numDataNodes(); int size = numDataNodes();
for (int i = size; i < n; i++) { for (int i = size; i < n; i++) {
logger.info("increasing cluster size from {} to {}", size, n); logger.info("increasing cluster size from {} to {}", size, n);
asyncs.add(startNodeAsync()); if (numSharedDedicatedMasterNodes > 0) {
asyncs.add(startDataOnlyNodeAsync());
} else {
asyncs.add(startNodeAsync());
}
} }
} }
try { try {
@ -535,10 +566,10 @@ public final class InternalTestCluster extends TestCluster {
} }
// prevent killing the master if possible and client nodes // prevent killing the master if possible and client nodes
final Stream<NodeAndClient> collection = final Stream<NodeAndClient> collection =
n == 0 ? nodes.values().stream() : nodes.values().stream().filter(new DataNodePredicate().and(new MasterNodePredicate(getMasterName()).negate())); n == 0 ? nodes.values().stream() : nodes.values().stream().filter(new DataNodePredicate().and(new MasterNodePredicate(getMasterName()).negate()));
final Iterator<NodeAndClient> values = collection.iterator(); final Iterator<NodeAndClient> values = collection.iterator();
logger.info("changing cluster size from {} to {}, {} data nodes", size(), n + numShareCoordOnlyNodes, n); logger.info("changing cluster size from {} to {}, {} data nodes", size(), n + numSharedCoordOnlyNodes, n);
Set<NodeAndClient> nodesToRemove = new HashSet<>(); Set<NodeAndClient> nodesToRemove = new HashSet<>();
int numNodesAndClients = 0; int numNodesAndClients = 0;
while (values.hasNext() && numNodesAndClients++ < size - n) { while (values.hasNext() && numNodesAndClients++ < size - n) {
@ -557,33 +588,59 @@ public final class InternalTestCluster extends TestCluster {
private NodeAndClient buildNode(Settings settings, Version version) { private NodeAndClient buildNode(Settings settings, Version version) {
int ord = nextNodeId.getAndIncrement(); int ord = nextNodeId.getAndIncrement();
return buildNode(ord, random.nextLong(), settings, version); return buildNode(ord, random.nextLong(), settings, version, false);
} }
private NodeAndClient buildNode() { private NodeAndClient buildNode() {
int ord = nextNodeId.getAndIncrement(); int ord = nextNodeId.getAndIncrement();
return buildNode(ord, random.nextLong(), null, Version.CURRENT); return buildNode(ord, random.nextLong(), null, Version.CURRENT, false);
} }
private NodeAndClient buildNode(int nodeId, long seed, Settings settings, Version version) { private NodeAndClient buildNode(int nodeId, long seed, Settings settings, Version version, boolean reuseExisting) {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
ensureOpen(); ensureOpen();
settings = getSettings(nodeId, seed, settings); settings = getSettings(nodeId, seed, settings);
Collection<Class<? extends Plugin>> plugins = getPlugins(); Collection<Class<? extends Plugin>> plugins = getPlugins();
String name = buildNodeName(nodeId); String name = buildNodeName(nodeId, settings);
assert !nodes.containsKey(name); if (reuseExisting && nodes.containsKey(name)) {
return nodes.get(name);
} else {
assert reuseExisting == true || nodes.containsKey(name) == false :
"node name [" + name + "] already exists but not allowed to use it";
}
Settings finalSettings = Settings.builder() Settings finalSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), baseDir) // allow overriding path.home .put(Environment.PATH_HOME_SETTING.getKey(), baseDir) // allow overriding path.home
.put(settings) .put(settings)
.put("node.name", name) .put("node.name", name)
.put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), seed) .put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), seed)
.build(); .build();
MockNode node = new MockNode(finalSettings, version, plugins); MockNode node = new MockNode(finalSettings, version, plugins);
return new NodeAndClient(name, node); return new NodeAndClient(name, node);
} }
private String buildNodeName(int id) { private String buildNodeName(int id, Settings settings) {
return nodePrefix + id; String prefix = nodePrefix;
prefix = prefix + getRoleSuffix(settings);
return prefix + id;
}
/**
* returns a suffix string based on the node role. If no explicit role is defined, the suffix will be empty
*/
private String getRoleSuffix(Settings settings) {
String suffix = "";
if (Node.NODE_MASTER_SETTING.exists(settings) && Node.NODE_MASTER_SETTING.get(settings)) {
suffix = suffix + DiscoveryNode.Role.MASTER.getAbbreviation();
}
if (Node.NODE_DATA_SETTING.exists(settings) && Node.NODE_DATA_SETTING.get(settings)) {
suffix = suffix + DiscoveryNode.Role.DATA.getAbbreviation();
}
if (Node.NODE_MASTER_SETTING.exists(settings) && Node.NODE_MASTER_SETTING.get(settings) == false &&
Node.NODE_DATA_SETTING.exists(settings) && Node.NODE_DATA_SETTING.get(settings) == false
) {
suffix = suffix + "c";
}
return suffix;
} }
/** /**
@ -873,14 +930,14 @@ public final class InternalTestCluster extends TestCluster {
TransportAddress addr = node.injector().getInstance(TransportService.class).boundAddress().publishAddress(); TransportAddress addr = node.injector().getInstance(TransportService.class).boundAddress().publishAddress();
Settings nodeSettings = node.settings(); Settings nodeSettings = node.settings();
Builder builder = Settings.builder() Builder builder = Settings.builder()
.put("client.transport.nodes_sampler_interval", "1s") .put("client.transport.nodes_sampler_interval", "1s")
.put(Environment.PATH_HOME_SETTING.getKey(), baseDir) .put(Environment.PATH_HOME_SETTING.getKey(), baseDir)
.put("node.name", TRANSPORT_CLIENT_PREFIX + node.settings().get("node.name")) .put("node.name", TRANSPORT_CLIENT_PREFIX + node.settings().get("node.name"))
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName).put("client.transport.sniff", sniff) .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName).put("client.transport.sniff", sniff)
.put(Node.NODE_MODE_SETTING.getKey(), Node.NODE_MODE_SETTING.exists(nodeSettings) ? Node.NODE_MODE_SETTING.get(nodeSettings) : nodeMode) .put(Node.NODE_MODE_SETTING.getKey(), Node.NODE_MODE_SETTING.exists(nodeSettings) ? Node.NODE_MODE_SETTING.get(nodeSettings) : nodeMode)
.put("logger.prefix", nodeSettings.get("logger.prefix", "")) .put("logger.prefix", nodeSettings.get("logger.prefix", ""))
.put("logger.level", nodeSettings.get("logger.level", "INFO")) .put("logger.level", nodeSettings.get("logger.level", "INFO"))
.put(settings); .put(settings);
if (Node.NODE_LOCAL_SETTING.exists(nodeSettings)) { if (Node.NODE_LOCAL_SETTING.exists(nodeSettings)) {
builder.put(Node.NODE_LOCAL_SETTING.getKey(), Node.NODE_LOCAL_SETTING.get(nodeSettings)); builder.put(Node.NODE_LOCAL_SETTING.getKey(), Node.NODE_LOCAL_SETTING.get(nodeSettings));
@ -924,39 +981,35 @@ public final class InternalTestCluster extends TestCluster {
Set<NodeAndClient> sharedNodes = new HashSet<>(); Set<NodeAndClient> sharedNodes = new HashSet<>();
assert sharedNodesSeeds.length == numSharedAllRolesNodes + numShareCoordOnlyNodes; assert sharedNodesSeeds.length == numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes;
boolean changed = false; for (int i = 0; i < numSharedDedicatedMasterNodes; i++) {
for (int i = 0; i < numSharedAllRolesNodes; i++) { final Settings.Builder settings = Settings.builder();
String buildNodeName = buildNodeName(i); settings.put(Node.NODE_MASTER_SETTING.getKey(), true).build();
NodeAndClient nodeAndClient = nodes.get(buildNodeName); settings.put(Node.NODE_DATA_SETTING.getKey(), false).build();
if (nodeAndClient == null) { NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), Version.CURRENT, true);
changed = true; nodeAndClient.node().start();
nodeAndClient = buildNode(i, sharedNodesSeeds[i], null, Version.CURRENT);
nodeAndClient.node.start();
logger.info("Start Shared Node [{}] not shared", nodeAndClient.name);
}
sharedNodes.add(nodeAndClient); sharedNodes.add(nodeAndClient);
} }
for (int i = numSharedAllRolesNodes; i < numSharedAllRolesNodes + numShareCoordOnlyNodes; i++) { for (int i = numSharedDedicatedMasterNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes; i++) {
String buildNodeName = buildNodeName(i); final Settings.Builder settings = Settings.builder();
NodeAndClient nodeAndClient = nodes.get(buildNodeName); if (numSharedDedicatedMasterNodes > 0) {
if (nodeAndClient == null) { // if we don't have dedicated master nodes, keep things default
changed = true; settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build();
Builder clientSettingsBuilder = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false) settings.put(Node.NODE_DATA_SETTING.getKey(), true).build();
.put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false);
nodeAndClient = buildNode(i, sharedNodesSeeds[i], clientSettingsBuilder.build(), Version.CURRENT);
nodeAndClient.node.start();
logger.info("Start Shared Node [{}] not shared", nodeAndClient.name);
} }
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), Version.CURRENT, true);
nodeAndClient.node().start();
sharedNodes.add(nodeAndClient); sharedNodes.add(nodeAndClient);
} }
if (!changed && sharedNodes.size() == nodes.size()) { for (int i = numSharedDedicatedMasterNodes + numSharedDataNodes;
logger.debug("Cluster is consistent - moving out - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), sharedNodesSeeds.length); i < numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes; i++) {
if (size() > 0) { final Builder settings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false)
client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(sharedNodesSeeds.length)).get(); .put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false);
} NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), Version.CURRENT, true);
return; // we are consistent - return nodeAndClient.node().start();
sharedNodes.add(nodeAndClient);
} }
for (NodeAndClient nodeAndClient : sharedNodes) { for (NodeAndClient nodeAndClient : sharedNodes) {
nodes.remove(nodeAndClient.name); nodes.remove(nodeAndClient.name);
} }
@ -1092,6 +1145,14 @@ public final class InternalTestCluster extends TestCluster {
return getInstances(clazz, new DataNodePredicate()); return getInstances(clazz, new DataNodePredicate());
} }
/**
* Returns an Iterable to all instances for the given class &gt;T&lt; across all data and master nodes
* in the cluster.
*/
public synchronized <T> Iterable<T> getDataOrMasterNodeInstances(Class<T> clazz) {
return getInstances(clazz, new DataOrMasterNodePredicate());
}
private synchronized <T> Iterable<T> getInstances(Class<T> clazz, Predicate<NodeAndClient> predicate) { private synchronized <T> Iterable<T> getInstances(Class<T> clazz, Predicate<NodeAndClient> predicate) {
Iterable<NodeAndClient> filteredNodes = nodes.values().stream().filter(predicate)::iterator; Iterable<NodeAndClient> filteredNodes = nodes.values().stream().filter(predicate)::iterator;
List<T> instances = new ArrayList<>(); List<T> instances = new ArrayList<>();
@ -1296,7 +1357,19 @@ public final class InternalTestCluster extends TestCluster {
} }
nodeAndClient.closeNode(); nodeAndClient.closeNode();
} }
for (NodeAndClient nodeAndClient : nodes.values()) {
// starting master nodes first, for now so restart will be quick. If we'll start
// the data nodes first, they will wait for 30s for a master
List<DiscoveryNode> discoveryNodes = new ArrayList<>();
for (ClusterService clusterService : getInstances(ClusterService.class)) {
discoveryNodes.add(clusterService.localNode());
}
discoveryNodes.sort((n1, n2) -> Boolean.compare(n1.isMasterNode() == false, n2.isMasterNode() == false));
for (DiscoveryNode node : discoveryNodes) {
NodeAndClient nodeAndClient = nodes.get(node.getName());
logger.info("Starting node [{}] ", nodeAndClient.name); logger.info("Starting node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) { if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this); activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
@ -1375,11 +1448,11 @@ public final class InternalTestCluster extends TestCluster {
private synchronized Set<String> nRandomDataNodes(int numNodes) { private synchronized Set<String> nRandomDataNodes(int numNodes) {
assert size() >= numNodes; assert size() >= numNodes;
Map<String, NodeAndClient> dataNodes = Map<String, NodeAndClient> dataNodes =
nodes nodes
.entrySet() .entrySet()
.stream() .stream()
.filter(new EntryNodePredicate(new DataNodePredicate())) .filter(new EntryNodePredicate(new DataNodePredicate()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final HashSet<String> set = new HashSet<>(); final HashSet<String> set = new HashSet<>();
final Iterator<String> iterator = dataNodes.keySet().iterator(); final Iterator<String> iterator = dataNodes.keySet().iterator();
for (int i = 0; i < numNodes; i++) { for (int i = 0; i < numNodes; i++) {
@ -1632,10 +1705,10 @@ public final class InternalTestCluster extends TestCluster {
private synchronized Collection<NodeAndClient> filterNodes(Map<String, InternalTestCluster.NodeAndClient> map, Predicate<NodeAndClient> predicate) { private synchronized Collection<NodeAndClient> filterNodes(Map<String, InternalTestCluster.NodeAndClient> map, Predicate<NodeAndClient> predicate) {
return map return map
.values() .values()
.stream() .stream()
.filter(predicate) .filter(predicate)
.collect(Collectors.toCollection(ArrayList::new)); .collect(Collectors.toCollection(ArrayList::new));
} }
private static final class DataNodePredicate implements Predicate<NodeAndClient> { private static final class DataNodePredicate implements Predicate<NodeAndClient> {
@ -1649,7 +1722,7 @@ public final class InternalTestCluster extends TestCluster {
@Override @Override
public boolean test(NodeAndClient nodeAndClient) { public boolean test(NodeAndClient nodeAndClient) {
return DiscoveryNode.isDataNode(nodeAndClient.node.settings()) || return DiscoveryNode.isDataNode(nodeAndClient.node.settings()) ||
DiscoveryNode.isMasterNode(nodeAndClient.node.settings()); DiscoveryNode.isMasterNode(nodeAndClient.node.settings());
} }
} }
@ -1887,6 +1960,7 @@ public final class InternalTestCluster extends TestCluster {
/** /**
* Simple interface that allows to wait for an async operation to finish * Simple interface that allows to wait for an async operation to finish
*
* @param <T> the result of the async execution * @param <T> the result of the async execution
*/ */
public interface Async<T> { public interface Async<T> {

View File

@ -49,6 +49,7 @@ public class InternalTestClusterTests extends ESTestCase {
public void testInitializiationIsConsistent() { public void testInitializiationIsConsistent() {
long clusterSeed = randomLong(); long clusterSeed = randomLong();
boolean masterNodes = randomBoolean();
int minNumDataNodes = randomIntBetween(0, 9); int minNumDataNodes = randomIntBetween(0, 9);
int maxNumDataNodes = randomIntBetween(minNumDataNodes, 10); int maxNumDataNodes = randomIntBetween(minNumDataNodes, 10);
String clusterName = randomRealisticUnicodeOfCodepointLengthBetween(1, 10); String clusterName = randomRealisticUnicodeOfCodepointLengthBetween(1, 10);
@ -58,8 +59,12 @@ public class InternalTestClusterTests extends ESTestCase {
String nodePrefix = randomRealisticUnicodeOfCodepointLengthBetween(1, 10); String nodePrefix = randomRealisticUnicodeOfCodepointLengthBetween(1, 10);
Path baseDir = createTempDir(); Path baseDir = createTempDir();
InternalTestCluster cluster0 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity()); InternalTestCluster cluster0 = new InternalTestCluster("local", clusterSeed, baseDir, masterNodes,
InternalTestCluster cluster1 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity()); minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
InternalTestCluster cluster1 = new InternalTestCluster("local", clusterSeed, baseDir, masterNodes,
minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
// TODO: this is not ideal - we should have a way to make sure ports are initialized in the same way // TODO: this is not ideal - we should have a way to make sure ports are initialized in the same way
assertClusters(cluster0, cluster1, false); assertClusters(cluster0, cluster1, false);
@ -101,6 +106,7 @@ public class InternalTestClusterTests extends ESTestCase {
public void testBeforeTest() throws Exception { public void testBeforeTest() throws Exception {
long clusterSeed = randomLong(); long clusterSeed = randomLong();
boolean masterNodes = randomBoolean();
int minNumDataNodes = randomIntBetween(0, 3); int minNumDataNodes = randomIntBetween(0, 3);
int maxNumDataNodes = randomIntBetween(minNumDataNodes, 4); int maxNumDataNodes = randomIntBetween(minNumDataNodes, 4);
final String clusterName1 = "shared1";//clusterName("shared1", clusterSeed); final String clusterName1 = "shared1";//clusterName("shared1", clusterSeed);
@ -115,8 +121,12 @@ public class InternalTestClusterTests extends ESTestCase {
String nodePrefix = "foobar"; String nodePrefix = "foobar";
Path baseDir = createTempDir(); Path baseDir = createTempDir();
InternalTestCluster cluster0 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity()); InternalTestCluster cluster0 = new InternalTestCluster("local", clusterSeed, baseDir, masterNodes,
InternalTestCluster cluster1 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity()); minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
InternalTestCluster cluster1 = new InternalTestCluster("local", clusterSeed, baseDir, masterNodes,
minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
assertClusters(cluster0, cluster1, false); assertClusters(cluster0, cluster1, false);
long seed = randomLong(); long seed = randomLong();