From 30d7faeba29362e0116532e6f71c34342c717709 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 20 Sep 2013 13:25:12 +0200 Subject: [PATCH] Cut over more tests to AbstractIntegrationTest --- .../cluster/AbstractZenNodesTests.java | 39 --- .../cluster/ClusterServiceTests.java | 69 ++--- .../cluster/MinimumMasterNodesTests.java | 207 +++++-------- .../cluster/SpecificMasterNodesTests.java | 63 ++-- .../ShardsAllocatorModuleTests.java | 24 +- .../test/AbstractIntegrationTest.java | 18 +- .../test/ElasticSearchTestCase.java | 4 +- .../org/elasticsearch/test/TestCluster.java | 278 +++++++++++++----- .../validate/SimpleValidateQueryTests.java | 4 +- 9 files changed, 378 insertions(+), 328 deletions(-) delete mode 100644 src/test/java/org/elasticsearch/cluster/AbstractZenNodesTests.java diff --git a/src/test/java/org/elasticsearch/cluster/AbstractZenNodesTests.java b/src/test/java/org/elasticsearch/cluster/AbstractZenNodesTests.java deleted file mode 100644 index 27f35145d95..00000000000 --- a/src/test/java/org/elasticsearch/cluster/AbstractZenNodesTests.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster; - -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.AbstractNodesTests; - -import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; - -/** - * - */ -public abstract class AbstractZenNodesTests extends AbstractNodesTests { - - @Override - protected final Settings getClassDefaultSettings() { - // we force zen discovery here since it has specific handling for specific master / data nodes - // and disconnections - return settingsBuilder().put("discovery.type", "zen").build(); - } - -} \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java index 2af3f1a81b4..3ae16a8a0df 100644 --- a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java +++ b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java @@ -22,7 +22,6 @@ package org.elasticsearch.cluster; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -31,10 +30,11 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Singleton; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.plugins.AbstractPlugin; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import org.elasticsearch.test.AbstractIntegrationTest.Scope; import org.elasticsearch.threadpool.ThreadPool; -import org.junit.After; import org.junit.Test; import java.util.*; @@ -48,23 +48,19 @@ import static org.hamcrest.Matchers.*; /** * */ -public class ClusterServiceTests extends AbstractZenNodesTests { - - @After - public void closeNodes() { - closeAllNodes(); - } +@ClusterScope(scope = Scope.TEST, numNodes=0) +public class ClusterServiceTests extends AbstractIntegrationTest { @Test public void testTimeoutUpdateTask() throws Exception { Settings settings = settingsBuilder() + .put("discovery.type", "zen") .put("discovery.zen.minimum_master_nodes", 1) .put("discovery.zen.ping_timeout", "200ms") .put("discovery.initial_state_timeout", "500ms") .build(); - - InternalNode node1 = (InternalNode) startNode("node1", settings); - ClusterService clusterService1 = node1.injector().getInstance(ClusterService.class); + cluster().startNode(settings); + ClusterService clusterService1 = cluster().getInstance(ClusterService.class); final CountDownLatch block = new CountDownLatch(1); clusterService1.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { @Override @@ -115,10 +111,13 @@ public class ClusterServiceTests extends AbstractZenNodesTests { @Test public void testPendingUpdateTask() throws Exception { - InternalNode node1 = (InternalNode) startNode("node1"); - Client client = startNode("client-node", settingsBuilder().put("node.client", true).build()).client(); + Settings zenSettings = settingsBuilder() + .put("discovery.type", "zen").build(); + String node_0 = cluster().startNode(zenSettings); + cluster().startNodeClient(zenSettings); + - ClusterService clusterService = node1.injector().getInstance(ClusterService.class); + ClusterService clusterService = cluster().getInstance(ClusterService.class, node_0); final CountDownLatch block1 = new CountDownLatch(1); final CountDownLatch invoked1 = new CountDownLatch(1); clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() { @@ -166,7 +165,7 @@ public class ClusterServiceTests extends AbstractZenNodesTests { assertTrue(controlSources.isEmpty()); controlSources = new HashSet(Arrays.asList("2", "3", "4", "5", "6", "7", "8", "9", "10")); - PendingClusterTasksResponse response = client.admin().cluster().preparePendingClusterTasks().execute().actionGet(); + PendingClusterTasksResponse response = cluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet(); assertThat(response.pendingTasks().size(), equalTo(9)); for (PendingClusterTask task : response) { assertTrue(controlSources.remove(task.source().string())); @@ -177,7 +176,7 @@ public class ClusterServiceTests extends AbstractZenNodesTests { pendingClusterTasks = clusterService.pendingTasks(); assertThat(pendingClusterTasks, empty()); - response = client.admin().cluster().preparePendingClusterTasks().execute().actionGet(); + response = cluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet(); assertThat(response.pendingTasks(), empty()); final CountDownLatch block2 = new CountDownLatch(1); @@ -225,7 +224,7 @@ public class ClusterServiceTests extends AbstractZenNodesTests { } assertTrue(controlSources.isEmpty()); - response = client.admin().cluster().preparePendingClusterTasks().execute().actionGet(); + response = cluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet(); assertThat(response.pendingTasks().size(), equalTo(4)); controlSources = new HashSet(Arrays.asList("2", "3", "4", "5")); for (PendingClusterTask task : response) { @@ -239,34 +238,35 @@ public class ClusterServiceTests extends AbstractZenNodesTests { @Test public void testListenerCallbacks() throws Exception { Settings settings = settingsBuilder() + .put("discovery.type", "zen") .put("discovery.zen.minimum_master_nodes", 1) .put("discovery.zen.ping_timeout", "200ms") .put("discovery.initial_state_timeout", "500ms") .put("plugin.types", TestPlugin.class.getName()) .build(); - InternalNode node1 = (InternalNode) startNode("node1", settings); - ClusterService clusterService1 = node1.injector().getInstance(ClusterService.class); - MasterAwareService testService1 = node1.injector().getInstance(MasterAwareService.class); + cluster().startNode(settings); + ClusterService clusterService1 = cluster().getInstance(ClusterService.class); + MasterAwareService testService1 = cluster().getInstance(MasterAwareService.class); // the first node should be a master as the minimum required is 1 assertThat(clusterService1.state().nodes().masterNode(), notNullValue()); assertThat(clusterService1.state().nodes().localNodeMaster(), is(true)); assertThat(testService1.master(), is(true)); - InternalNode node2 = (InternalNode) startNode("node2", settings); - ClusterService clusterService2 = node2.injector().getInstance(ClusterService.class); - MasterAwareService testService2 = node2.injector().getInstance(MasterAwareService.class); + String node_1 = cluster().startNode(settings); + ClusterService clusterService2 = cluster().getInstance(ClusterService.class, node_1); + MasterAwareService testService2 = cluster().getInstance(MasterAwareService.class, node_1); - ClusterHealthResponse clusterHealth = node2.client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); // the second node should not be the master as node1 is already the master. assertThat(clusterService2.state().nodes().localNodeMaster(), is(false)); assertThat(testService2.master(), is(false)); - closeNode("node1"); - clusterHealth = node2.client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("1").execute().actionGet(); + cluster().stopCurrentMasterNode(); + clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("1").execute().actionGet(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); // now that node1 is closed, node2 should be elected as master @@ -275,8 +275,9 @@ public class ClusterServiceTests extends AbstractZenNodesTests { Settings newSettings = settingsBuilder() .put("discovery.zen.minimum_master_nodes", 2) + .put("discovery.type", "zen") .build(); - node2.client().admin().cluster().prepareUpdateSettings().setTransientSettings(newSettings).execute().actionGet(); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(newSettings).execute().actionGet(); Thread.sleep(200); // there should not be any master as the minimum number of required eligible masters is not met @@ -284,17 +285,17 @@ public class ClusterServiceTests extends AbstractZenNodesTests { assertThat(testService2.master(), is(false)); - node1 = (InternalNode) startNode("node1", settings); - clusterService1 = node1.injector().getInstance(ClusterService.class); - testService1 = node1.injector().getInstance(MasterAwareService.class); + String node_2 = cluster().startNode(settings); + clusterService1 =cluster().getInstance(ClusterService.class, node_2); + testService1 = cluster().getInstance(MasterAwareService.class, node_2); // make sure both nodes see each other otherwise the masternode below could be null if node 2 is master and node 1 did'r receive the updated cluster state... - assertThat(node2.client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true).setWaitForNodes("2").execute().actionGet().isTimedOut(), is(false)); - assertThat(node1.client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true).setWaitForNodes("2").execute().actionGet().isTimedOut(), is(false)); + assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true).setWaitForNodes("2").execute().actionGet().isTimedOut(), is(false)); + assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true).setWaitForNodes("2").execute().actionGet().isTimedOut(), is(false)); // now that we started node1 again, a new master should be elected assertThat(clusterService1.state().nodes().masterNode(), is(notNullValue())); - if ("node1".equals(clusterService1.state().nodes().masterNode().name())) { + if (node_2.equals(clusterService1.state().nodes().masterNode().name())) { assertThat(testService1.master(), is(true)); assertThat(testService2.master(), is(false)); } else { diff --git a/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java b/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java index d9075e4460d..62a9b937a18 100644 --- a/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java +++ b/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java @@ -20,52 +20,29 @@ package org.elasticsearch.cluster; import com.google.common.base.Predicate; -import com.google.common.collect.Sets; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; -import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.client.Client; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.gateway.Gateway; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.node.internal.InternalNode; -import org.junit.After; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import org.elasticsearch.test.AbstractIntegrationTest.Scope; import org.junit.Test; -import java.util.LinkedList; -import java.util.Set; import java.util.concurrent.TimeUnit; import static org.elasticsearch.client.Requests.clusterHealthRequest; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.hamcrest.Matchers.equalTo; -public class MinimumMasterNodesTests extends AbstractZenNodesTests { - - @After - public void cleanAndCloseNodes() throws Exception { - super.tearDown(); - for (int i = 0; i < 10; i++) { - if (node("node" + i) != null) { - node("node" + i).stop(); - // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well - if (((InternalNode) node("node" + i)).injector().getInstance(NodeEnvironment.class).hasNodeFile()) { - ((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset(); - } - } - } - closeAllNodes(); - } +@ClusterScope(scope = Scope.TEST, numNodes=0) +public class MinimumMasterNodesTests extends AbstractIntegrationTest { @Test public void simpleMinimumMasterNodes() throws Exception { - logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local")); - buildNode("node2", settingsBuilder().put("gateway.type", "local")); - cleanAndCloseNodes(); - Settings settings = settingsBuilder() .put("discovery.type", "zen") @@ -77,135 +54,116 @@ public class MinimumMasterNodesTests extends AbstractZenNodesTests { .build(); logger.info("--> start first node"); - startNode("node1", settings); + cluster().startNode(settings); logger.info("--> should be blocked, no master..."); - ClusterState state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true)); logger.info("--> start second node, cluster should be formed"); - startNode("node2", settings); + cluster().startNode(settings); - ClusterHealthResponse clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false)); - state = client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false)); - state = client("node1").admin().cluster().prepareState().execute().actionGet().getState(); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); assertThat(state.nodes().size(), equalTo(2)); assertThat(state.metaData().indices().containsKey("test"), equalTo(false)); - client("node1").admin().indices().prepareCreate("test").execute().actionGet(); + client().admin().indices().prepareCreate("test").execute().actionGet(); logger.info("--> indexing some data"); for (int i = 0; i < 100; i++) { - client("node1").prepareIndex("test", "type1", Integer.toString(i)).setSource("field", "value").execute().actionGet(); + client().prepareIndex("test", "type1", Integer.toString(i)).setSource("field", "value").execute().actionGet(); } // make sure that all shards recovered before trying to flush - assertThat(client("node1").admin().cluster().prepareHealth("test").setWaitForActiveShards(2).execute().actionGet().getActiveShards(), equalTo(2)); + assertThat(client().admin().cluster().prepareHealth("test").setWaitForActiveShards(2).execute().actionGet().getActiveShards(), equalTo(2)); // flush for simpler debugging - client("node1").admin().indices().prepareFlush().execute().actionGet(); + client().admin().indices().prepareFlush().execute().actionGet(); - client("node1").admin().indices().prepareRefresh().execute().actionGet(); + client().admin().indices().prepareRefresh().execute().actionGet(); logger.info("--> verify we the data back"); for (int i = 0; i < 10; i++) { - assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l)); + assertThat(client().prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l)); } - String masterNodeName = state.nodes().masterNode().name(); - String nonMasterNodeName = masterNodeName.equals("node1") ? "node2" : "node1"; - logger.info("--> closing master node {}", masterNodeName); - closeNode(masterNodeName); - final String noMasterNode = nonMasterNodeName; + cluster().stopCurrentMasterNode(); awaitBusy(new Predicate() { public boolean apply(Object obj) { - ClusterState state = client(noMasterNode).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); return state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK); } }); - state = client(nonMasterNodeName).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true)); logger.info("--> starting the previous master node again..."); - startNode(masterNodeName, settings); + cluster().startNode(settings); - clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().setWaitForNodes("2").execute().actionGet(); + clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().setWaitForNodes("2").execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false)); - state = client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false)); - state = client("node1").admin().cluster().prepareState().execute().actionGet().getState(); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); assertThat(state.nodes().size(), equalTo(2)); assertThat(state.metaData().indices().containsKey("test"), equalTo(true)); logger.info("Running Cluster Health"); - ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); logger.info("--> verify we the data back"); for (int i = 0; i < 10; i++) { - assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l)); + assertThat(client().prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l)); } - masterNodeName = state.nodes().masterNode().name(); - nonMasterNodeName = masterNodeName.equals("node1") ? "node2" : "node1"; - logger.info("--> closing non master node {}", nonMasterNodeName); - closeNode(nonMasterNodeName); - - final String masterNode = masterNodeName; - awaitBusy(new Predicate() { + cluster().stopRandomNonMasterNode(); + assertThat(awaitBusy(new Predicate() { public boolean apply(Object obj) { - ClusterState state = client(masterNode).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); return state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK); } - }); - state = client(masterNodeName).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); - assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true)); + }), equalTo(true)); logger.info("--> starting the previous master node again..."); - startNode(nonMasterNodeName, settings); + cluster().startNode(settings); - clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet(); + clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false)); - state = client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false)); - state = client("node1").admin().cluster().prepareState().execute().actionGet().getState(); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); assertThat(state.nodes().size(), equalTo(2)); assertThat(state.metaData().indices().containsKey("test"), equalTo(true)); logger.info("Running Cluster Health"); - clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); logger.info("--> verify we the data back"); for (int i = 0; i < 10; i++) { - assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l)); + assertThat(client().prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l)); } } @Test public void multipleNodesShutdownNonMasterNodes() throws Exception { - logger.info("--> cleaning nodes"); - buildNode("node1", settingsBuilder().put("gateway.type", "local")); - buildNode("node2", settingsBuilder().put("gateway.type", "local")); - buildNode("node3", settingsBuilder().put("gateway.type", "local")); - buildNode("node4", settingsBuilder().put("gateway.type", "local")); - cleanAndCloseNodes(); - - Settings settings = settingsBuilder() .put("discovery.type", "zen") .put("discovery.zen.minimum_master_nodes", 3) @@ -215,108 +173,93 @@ public class MinimumMasterNodesTests extends AbstractZenNodesTests { .build(); logger.info("--> start first 2 nodes"); - startNode("node1", settings); - startNode("node2", settings); + cluster().startNode(settings); + cluster().startNode(settings); ClusterState state; awaitBusy(new Predicate() { public boolean apply(Object obj) { - ClusterState state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); return state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK); } }); awaitBusy(new Predicate() { public boolean apply(Object obj) { - ClusterState state = client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); return state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK); } }); - state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true)); - state = client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true)); logger.info("--> start two more nodes"); - startNode("node3", settings); - startNode("node4", settings); + cluster().startNode(settings); + cluster().startNode(settings); - ClusterHealthResponse clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("4").execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("4").execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - state = client("node1").admin().cluster().prepareState().execute().actionGet().getState(); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); assertThat(state.nodes().size(), equalTo(4)); - final String masterNode = state.nodes().masterNode().name(); - LinkedList nonMasterNodes = new LinkedList(); - for (DiscoveryNode node : state.nodes()) { - if (!node.name().equals(masterNode)) { - nonMasterNodes.add(node.name()); - } - } logger.info("--> indexing some data"); for (int i = 0; i < 100; i++) { - client("node1").prepareIndex("test", "type1", Integer.toString(i)).setSource("field", "value").execute().actionGet(); + client().prepareIndex("test", "type1", Integer.toString(i)).setSource("field", "value").execute().actionGet(); } // make sure that all shards recovered before trying to flush - assertThat(client("node1").admin().cluster().prepareHealth("test").setWaitForActiveShards(10).execute().actionGet().isTimedOut(), equalTo(false)); + assertThat(client().admin().cluster().prepareHealth("test").setWaitForActiveShards(10).execute().actionGet().isTimedOut(), equalTo(false)); // flush for simpler debugging - client("node1").admin().indices().prepareFlush().execute().actionGet(); + client().admin().indices().prepareFlush().execute().actionGet(); - client("node1").admin().indices().prepareRefresh().execute().actionGet(); + client().admin().indices().prepareRefresh().execute().actionGet(); logger.info("--> verify we the data back"); for (int i = 0; i < 10; i++) { - assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l)); + assertThat(client().prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l)); } - Set nodesToShutdown = Sets.newHashSet(); - nodesToShutdown.add(nonMasterNodes.removeLast()); - nodesToShutdown.add(nonMasterNodes.removeLast()); - logger.info("--> shutting down two master nodes {}", nodesToShutdown); - for (String nodeToShutdown : nodesToShutdown) { - closeNode(nodeToShutdown); - } + cluster().stopRandomNonMasterNode(); + cluster().stopRandomNonMasterNode(); - final String lastNonMasterNodeUp = nonMasterNodes.removeLast(); logger.info("--> verify that there is no master anymore on remaining nodes"); // spin here to wait till the state is set - awaitBusy(new Predicate() { + assertThat(awaitBusy(new Predicate() { public boolean apply(Object obj) { - ClusterState state = client(masterNode).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); - boolean firstNoMasterLock = state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK); - state = client(lastNonMasterNodeUp).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); - boolean secondNoMasterLock = state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK); - return firstNoMasterLock && secondNoMasterLock; + boolean success = true; + for(Client client : cluster()) { + ClusterState state = client.admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + success &= state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK); + if (logger.isDebugEnabled()) { + logger.debug("Checking for NO_MASTER_BLOCL on client: {} NO_MASTER_BLOCK: [{}]", client, state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)); + } + } + return success; } - }, 20, TimeUnit.SECONDS); - - state = client(masterNode).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); - assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true)); - state = client(lastNonMasterNodeUp).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); - assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true)); + }, 20, TimeUnit.SECONDS), equalTo(true)); - logger.info("--> start back the nodes {}", nodesToShutdown); - for (String nodeToShutdown : nodesToShutdown) { - startNode(nodeToShutdown, settings); - } + logger.info("--> start back the 2 nodes "); + cluster().startNode(settings); + cluster().startNode(settings); - clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("4").execute().actionGet(); + clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("4").execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("Running Cluster Health"); - ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - state = client("node1").admin().cluster().prepareState().execute().actionGet().getState(); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); assertThat(state.nodes().size(), equalTo(4)); logger.info("--> verify we the data back"); for (int i = 0; i < 10; i++) { - assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l)); + assertThat(client().prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l)); } } } diff --git a/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesTests.java b/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesTests.java index 654a5790290..e4b6f67ac74 100644 --- a/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesTests.java +++ b/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesTests.java @@ -19,83 +19,80 @@ package org.elasticsearch.cluster; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.discovery.MasterNotDiscoveredException; -import org.junit.After; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import org.elasticsearch.test.AbstractIntegrationTest.Scope; import org.junit.Test; -import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; /** * */ -public class SpecificMasterNodesTests extends AbstractZenNodesTests { +@ClusterScope(scope = Scope.TEST, numNodes=0) +public class SpecificMasterNodesTests extends AbstractIntegrationTest { - @After - public void closeNodes() { - closeAllNodes(); + protected final ImmutableSettings.Builder settingsBuilder() { + return ImmutableSettings.builder().put("discovery.type", "zen"); } - @Test public void simpleOnlyMasterNodeElection() { logger.info("--> start data node / non master node"); - startNode("data1", settingsBuilder().put("node.data", true).put("node.master", false).put("discovery.initial_state_timeout", "1s")); + cluster().startNode(settingsBuilder().put("node.data", true).put("node.master", false).put("discovery.initial_state_timeout", "1s")); try { - assertThat(client("data1").admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().masterNodeId(), nullValue()); + assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().masterNodeId(), nullValue()); assert false : "should not be able to find master"; } catch (MasterNotDiscoveredException e) { // all is well, no master elected } logger.info("--> start master node"); - startNode("master1", settingsBuilder().put("node.data", false).put("node.master", true)); - assertThat(client("data1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1")); - assertThat(client("master1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1")); + final String masterNodeName = cluster().startNode(settingsBuilder().put("node.data", false).put("node.master", true)); + assertThat(cluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName)); + assertThat(cluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName)); logger.info("--> stop master node"); - closeNode("master1"); + cluster().stopCurrentMasterNode(); try { - assertThat(client("data1").admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().masterNodeId(), nullValue()); + assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().masterNodeId(), nullValue()); assert false : "should not be able to find master"; } catch (MasterNotDiscoveredException e) { // all is well, no master elected } logger.info("--> start master node"); - startNode("master1", settingsBuilder().put("node.data", false).put("node.master", true)); - assertThat(client("data1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1")); - assertThat(client("master1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1")); - - logger.info("--> stop all nodes"); - closeNode("data1"); - closeNode("master1"); + final String nextMasterEligableNodeName = cluster().startNode(settingsBuilder().put("node.data", false).put("node.master", true)); + assertThat(cluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(nextMasterEligableNodeName)); + assertThat(cluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(nextMasterEligableNodeName)); } @Test public void electOnlyBetweenMasterNodes() { logger.info("--> start data node / non master node"); - startNode("data1", settingsBuilder().put("node.data", true).put("node.master", false).put("discovery.initial_state_timeout", "1s")); + cluster().startNode(settingsBuilder().put("node.data", true).put("node.master", false).put("discovery.initial_state_timeout", "1s")); try { - assertThat(client("data1").admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().masterNodeId(), nullValue()); + assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet().getState().nodes().masterNodeId(), nullValue()); assert false : "should not be able to find master"; } catch (MasterNotDiscoveredException e) { // all is well, no master elected } logger.info("--> start master node (1)"); - startNode("master1", settingsBuilder().put("node.data", false).put("node.master", true)); - assertThat(client("data1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1")); - assertThat(client("master1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1")); + final String masterNodeName = cluster().startNode(settingsBuilder().put("node.data", false).put("node.master", true)); + assertThat(cluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName)); + assertThat(cluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName)); logger.info("--> start master node (2)"); - startNode("master2", settingsBuilder().put("node.data", false).put("node.master", true)); - assertThat(client("data1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1")); - assertThat(client("master1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1")); - assertThat(client("master2").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master1")); + final String nextMasterEligableNodeName = cluster().startNode(settingsBuilder().put("node.data", false).put("node.master", true)); + assertThat(cluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName)); + assertThat(cluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName)); + assertThat(cluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(masterNodeName)); logger.info("--> closing master node (1)"); - closeNode("master1"); - assertThat(client("data1").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master2")); - assertThat(client("master2").admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo("master2")); + cluster().stopCurrentMasterNode(); + assertThat(cluster().nonMasterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(nextMasterEligableNodeName)); + assertThat(cluster().masterClient().admin().cluster().prepareState().execute().actionGet().getState().nodes().masterNode().name(), equalTo(nextMasterEligableNodeName)); } } diff --git a/src/test/java/org/elasticsearch/cluster/allocation/ShardsAllocatorModuleTests.java b/src/test/java/org/elasticsearch/cluster/allocation/ShardsAllocatorModuleTests.java index ca787c8587b..4a842575d3c 100644 --- a/src/test/java/org/elasticsearch/cluster/allocation/ShardsAllocatorModuleTests.java +++ b/src/test/java/org/elasticsearch/cluster/allocation/ShardsAllocatorModuleTests.java @@ -25,20 +25,16 @@ import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocatorModule; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.internal.InternalNode; -import org.elasticsearch.test.AbstractNodesTests; -import org.junit.After; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import org.elasticsearch.test.AbstractIntegrationTest.Scope; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.hamcrest.Matchers.instanceOf; -public class ShardsAllocatorModuleTests extends AbstractNodesTests { +@ClusterScope(scope=Scope.TEST, numNodes=0) +public class ShardsAllocatorModuleTests extends AbstractIntegrationTest { - @After - public void cleanAndCloseNodes() throws Exception { - closeAllNodes(); - } public void testLoadDefaultShardsAllocator() { assertAllocatorInstance(ImmutableSettings.Builder.EMPTY_SETTINGS, BalancedShardsAllocator.class); @@ -62,11 +58,11 @@ public class ShardsAllocatorModuleTests extends AbstractNodesTests { } private void assertAllocatorInstance(Settings settings, Class clazz) { - closeNode("node"); - Node _node = startNode("node", settings); - InternalNode node = (InternalNode) _node; - ShardsAllocator instance = node.injector().getInstance(ShardsAllocator.class); - node.close(); + while (cluster().numNodes() != 0) { + cluster().stopRandomNode(); + } + cluster().startNode(settings); + ShardsAllocator instance = cluster().getInstance(ShardsAllocator.class); assertThat(instance, instanceOf(clazz)); } } diff --git a/src/test/java/org/elasticsearch/test/AbstractIntegrationTest.java b/src/test/java/org/elasticsearch/test/AbstractIntegrationTest.java index 9a3f6a04a28..a939acec08c 100644 --- a/src/test/java/org/elasticsearch/test/AbstractIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/AbstractIntegrationTest.java @@ -171,6 +171,7 @@ public abstract class AbstractIntegrationTest extends ElasticSearchTestCase { ensureAllFilesClosed(); logger.info("[{}#{}]: cleaned up after test", getTestClass().getSimpleName(), getTestName()); } finally { + currentCluster.afterTest(); currentCluster = null; } } @@ -199,7 +200,7 @@ public abstract class AbstractIntegrationTest extends ElasticSearchTestCase { } public static Iterable clients() { - return cluster().clients(); + return cluster(); } public ImmutableSettings.Builder randomSettingsBuilder() { @@ -559,14 +560,25 @@ public abstract class AbstractIntegrationTest extends ElasticSearchTestCase { GLOBAL, SUITE, TEST; } + private ClusterScope getAnnotation(Class clazz) { + if (clazz == Object.class || clazz == AbstractIntegrationTest.class) { + return null; + } + ClusterScope annotation = clazz.getAnnotation(ClusterScope.class); + if (annotation != null) { + return annotation; + } + return getAnnotation(clazz.getSuperclass()); + } + private Scope getCurrentClusterScope() { - ClusterScope annotation = this.getClass().getAnnotation(ClusterScope.class); + ClusterScope annotation = getAnnotation(this.getClass()); // if we are not annotated assume global! return annotation == null ? Scope.GLOBAL : annotation.scope(); } private int getNumNodes() { - ClusterScope annotation = this.getClass().getAnnotation(ClusterScope.class); + ClusterScope annotation = getAnnotation(this.getClass()); return annotation == null ? -1 : annotation.numNodes(); } diff --git a/src/test/java/org/elasticsearch/test/ElasticSearchTestCase.java b/src/test/java/org/elasticsearch/test/ElasticSearchTestCase.java index cc59621b2d7..cdf51e05dc5 100644 --- a/src/test/java/org/elasticsearch/test/ElasticSearchTestCase.java +++ b/src/test/java/org/elasticsearch/test/ElasticSearchTestCase.java @@ -62,8 +62,8 @@ public abstract class ElasticSearchTestCase extends AbstractRandomizedTest { } - public void awaitBusy(Predicate breakPredicate) throws InterruptedException { - awaitBusy(breakPredicate, 10, TimeUnit.SECONDS); + public boolean awaitBusy(Predicate breakPredicate) throws InterruptedException { + return awaitBusy(breakPredicate, 10, TimeUnit.SECONDS); } public boolean awaitBusy(Predicate breakPredicate, long maxWaitTime, TimeUnit unit) throws InterruptedException { diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java index 574fc494b6f..fcbff577ed9 100644 --- a/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/src/test/java/org/elasticsearch/test/TestCluster.java @@ -21,6 +21,8 @@ package org.elasticsearch.test; import com.carrotsearch.randomizedtesting.SeedUtils; import com.google.common.base.Function; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; @@ -28,11 +30,13 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.network.NetworkUtils; @@ -41,6 +45,7 @@ import org.elasticsearch.common.settings.ImmutableSettings.Builder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.index.store.mock.MockFSIndexStoreModule; import org.elasticsearch.node.Node; @@ -48,6 +53,7 @@ import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.transport.TransportService; import java.io.Closeable; +import java.io.File; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -56,12 +62,14 @@ import static com.google.common.collect.Maps.newTreeMap; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.node.NodeBuilder.nodeBuilder; -public class TestCluster implements Closeable { +public class TestCluster implements Closeable, Iterable { protected final ESLogger logger = Loggers.getLogger(getClass()); /* sorted map to make traverse order reproducible */ private final TreeMap nodes = newTreeMap(); + + private final Set dataDirToClean = new HashSet(); private final String clusterName; @@ -69,8 +77,6 @@ public class TestCluster implements Closeable { private final Settings defaultSettings; - private NodeAndClient clientNode; // currently unused - private Random random; private AtomicInteger nextNodeId = new AtomicInteger(0); @@ -154,29 +160,38 @@ public class TestCluster implements Closeable { return randomNodeAndClient.node(); } NodeAndClient buildNode = buildNode(); - nodes.put(buildNode.name, buildNode); - return buildNode.node().start(); + buildNode.node().start(); + publishNode(buildNode); + return buildNode.node(); } private synchronized NodeAndClient getRandomNodeAndClient() { + Predicate all = Predicates.alwaysTrue(); + return getRandomNodeAndClient(all); + } + + + private synchronized NodeAndClient getRandomNodeAndClient(Predicate predicate) { ensureOpen(); - Collection values = nodes.values(); - int whichOne = random.nextInt(values.size()); - for (NodeAndClient nodeAndClient : values) { - if (whichOne-- == 0) { - return nodeAndClient; + Collection values = Collections2.filter(nodes.values(), predicate) ; + if (!values.isEmpty()) { + int whichOne = random.nextInt(values.size()); + for (NodeAndClient nodeAndClient : values) { + if (whichOne-- == 0) { + return nodeAndClient; + } } } return null; } - + public synchronized void ensureAtLeastNumNodes(int num) { int size = nodes.size(); for (int i = size; i < num; i++) { logger.info("increasing cluster size from {} to {}", size, num); NodeAndClient buildNode = buildNode(); buildNode.node().start(); - nodes.put(buildNode.name, buildNode); + publishNode(buildNode); } } @@ -227,13 +242,43 @@ public class TestCluster implements Closeable { return getOrBuildRandomNode().client(); } - private synchronized Client masterClient() { // should we expose this? - Collection values = nodes.values(); - for (NodeAndClient nodeAndClient : values) { - ClusterService instance = getInstance(ClusterService.class, (InternalNode)nodeAndClient.node()); - if (instance.state().getNodes().getMasterNode().id().equals(instance.localNode().id())) { - return nodeAndClient.client(random); + public synchronized Client masterClient() { + ensureOpen(); + NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new MasterNodePredicate(getMasterName())); + if (randomNodeAndClient != null) { + return randomNodeAndClient.client(random); + } + return null; + } + + public synchronized Client nonMasterClient() { + ensureOpen(); + NodeAndClient randomNodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName()))); + if (randomNodeAndClient != null) { + return randomNodeAndClient.client(random); + } + return null; + } + + public synchronized Client clientNodeClient() { + ensureOpen(); + NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new ClientNodePredicate()); + if (randomNodeAndClient != null) { + return randomNodeAndClient.client(random); + } + return null; + } + + public synchronized Client client(final Predicate filterPredicate) { + ensureOpen(); + final NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new Predicate() { + @Override + public boolean apply(NodeAndClient nodeAndClient) { + return filterPredicate.apply(nodeAndClient.node.settings()); } + }); + if (randomNodeAndClient != null) { + return randomNodeAndClient.client(random); } return null; } @@ -243,9 +288,6 @@ public class TestCluster implements Closeable { if (this.open.compareAndSet(true, false)) { IOUtils.closeWhileHandlingException(nodes.values()); nodes.clear(); - if (clientNode != null) { - IOUtils.closeWhileHandlingException(clientNode); - } } } @@ -261,14 +303,14 @@ public class TestCluster implements Closeable { } private final class NodeAndClient implements Closeable { - private final Node node; + private final InternalNode node; private Client client; private final AtomicBoolean closed = new AtomicBoolean(false); private final ClientFactory clientFactory; private final String name; NodeAndClient(String name, Node node, ClientFactory factory) { - this.node = node; + this.node = (InternalNode)node; this.name = name; this.clientFactory = factory; } @@ -357,6 +399,7 @@ public class TestCluster implements Closeable { public synchronized void beforeTest(Random random) { this.random = new Random(random.nextLong()); resetClients(); /* reset all clients - each test gets it's own client based on the Random instance created above. */ + wipeDataDirectories(); if (nextNodeId.get() == sharedNodesSeeds.length && nodes.size() == sharedNodesSeeds.length) { logger.debug("Cluster hasn't changed - moving out - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), sharedNodesSeeds.length); return; @@ -396,7 +439,7 @@ public class TestCluster implements Closeable { } nodes.clear(); for (NodeAndClient nodeAndClient : sharedNodes) { - nodes.put(nodeAndClient.name, nodeAndClient); + publishNode(nodeAndClient); } nextNodeId.set(sharedNodesSeeds.length); assert numNodes() == sharedNodesSeeds.length; @@ -406,21 +449,53 @@ public class TestCluster implements Closeable { logger.debug("Cluster is consistent again - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), sharedNodesSeeds.length); } + public synchronized void afterTest() { + wipeDataDirectories(); + } + private void resetClients() { final Collection nodesAndClients = nodes.values(); for (NodeAndClient nodeAndClient : nodesAndClients) { nodeAndClient.resetClient(); } } + + private void wipeDataDirectories() { + if (!dataDirToClean.isEmpty()) { + logger.info("Wipe data directory for all nodes locations: {}", this.dataDirToClean); + try { + FileSystemUtils.deleteRecursively(dataDirToClean.toArray(new File[0])); + } finally { + this.dataDirToClean.clear(); + } + } + } public synchronized ClusterService clusterService() { return getInstance(ClusterService.class); } - public synchronized T getInstance(Class clazz) { - return getInstance(clazz, ((InternalNode) getOrBuildRandomNode())); + + public synchronized T getInstance(Class clazz, final String node) { + final Predicate predicate; + if (node != null) { + predicate = new Predicate() { + public boolean apply(NodeAndClient nodeAndClient) { + return node.equals(nodeAndClient.name); + } + }; + } else { + predicate = Predicates.alwaysTrue(); + } + NodeAndClient randomNodeAndClient = getRandomNodeAndClient(predicate); + assert randomNodeAndClient != null; + return getInstanceFromNode(clazz, randomNodeAndClient.node); } - private synchronized T getInstance(Class clazz, InternalNode node) { + public synchronized T getInstance(Class clazz) { + return getInstance(clazz, null); + } + + private synchronized T getInstanceFromNode(Class clazz, InternalNode node) { return node.injector().getInstance(clazz); } @@ -432,40 +507,54 @@ public class TestCluster implements Closeable { ensureOpen(); NodeAndClient nodeAndClient = getRandomNodeAndClient(); if (nodeAndClient != null) { + logger.info("Closing random node [{}] ", nodeAndClient.name); nodes.remove(nodeAndClient.name); nodeAndClient.close(); } } - - public synchronized Iterable clients() { - final Map nodes = this.nodes; - return new Iterable() { - + + public synchronized void stopRandomNode(final Predicate filter) { + ensureOpen(); + NodeAndClient nodeAndClient = getRandomNodeAndClient(new Predicate() { @Override - public Iterator iterator() { - final Iterator iterator = nodes.values().iterator(); - return new Iterator() { - - @Override - public boolean hasNext() { - - return iterator.hasNext(); - } - - @Override - public Client next() { - return iterator.next().client(random); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(""); - } - - }; + public boolean apply(NodeAndClient nodeAndClient) { + return filter.apply(nodeAndClient.node.settings()); } - }; - + }); + if (nodeAndClient != null) { + logger.info("Closing filtered random node [{}] ", nodeAndClient.name); + nodes.remove(nodeAndClient.name); + nodeAndClient.close(); + } + } + + public synchronized void stopCurrentMasterNode() { + ensureOpen(); + assert numNodes() > 0; + String masterNodeName = getMasterName(); + assert nodes.containsKey(masterNodeName); + logger.info("Closing master node [{}] ", masterNodeName); + NodeAndClient remove = nodes.remove(masterNodeName); + remove.close(); + } + + public void stopRandomNonMasterNode() { + NodeAndClient nodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName()))); + if (nodeAndClient != null) { + logger.info("Closing random non master node [{}] current master [{}] ", nodeAndClient.name, getMasterName()); + nodes.remove(nodeAndClient.name); + nodeAndClient.close(); + } + } + + private String getMasterName() { + try { + ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); + return state.nodes().masterNode().name(); + } catch (Throwable e) { + logger.warn("Can't fetch cluster state" , e); + throw new RuntimeException("Can't get master node " + e.getMessage(), e); + } } public synchronized Set allButN(int numNodes) { @@ -477,19 +566,9 @@ public class TestCluster implements Closeable { return Sets.newHashSet(Iterators.limit(this.nodes.keySet().iterator(), numNodes)); } - private synchronized Client nodeClient() { + public synchronized void startNodeClient(Settings settings) { ensureOpen(); // currently unused - if (clientNode == null) { - String name = "client_node"; - Settings finalSettings = settingsBuilder().put(defaultSettings).put("name", name) - .build(); - Node node = nodeBuilder().settings(finalSettings).client(true).build(); - node.start(); - this.clientNode = new NodeAndClient(name, node, new ClientFactory()); - - } - return clientNode.client(random); - + startNode(settingsBuilder().put(settings).put("node.client", true)); } public synchronized Set nodesInclude(String index) { @@ -528,22 +607,83 @@ public class TestCluster implements Closeable { public String startNode() { return startNode(ImmutableSettings.EMPTY); } + + public String startNode(Settings.Builder settings) { + return startNode(settings.build()); + } public String startNode(Settings settings) { NodeAndClient buildNode = buildNode(settings); - nodes.put(buildNode.name, buildNode); buildNode.node().start(); + publishNode(buildNode); return buildNode.name; } + private void publishNode(NodeAndClient nodeAndClient) { + assert !nodeAndClient.node().isClosed(); + NodeEnvironment nodeEnv = getInstanceFromNode(NodeEnvironment.class, nodeAndClient.node); + if (nodeEnv.hasNodeFile()) { + dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataLocations())); + } + nodes.put(nodeAndClient.name, nodeAndClient); + + } + public void resetAllGateways() throws Exception { Collection values = this.nodes.values(); for (NodeAndClient nodeAndClient : values) { - getInstance(Gateway.class, ((InternalNode) nodeAndClient.node)).reset(); + getInstanceFromNode(Gateway.class, ((InternalNode) nodeAndClient.node)).reset(); } } public void closeAllNodesAndReset() { beforeTest(random); } + + + private static final class MasterNodePredicate implements Predicate { + private final String masterNodeName; + + public MasterNodePredicate(String masterNodeName) { + this.masterNodeName = masterNodeName; + } + + @Override + public boolean apply(NodeAndClient nodeAndClient) { + return masterNodeName.equals(nodeAndClient.name); + } + } + + private static final class ClientNodePredicate implements Predicate { + + @Override + public boolean apply(NodeAndClient nodeAndClient) { + return nodeAndClient.node.settings().getAsBoolean("node.client", false); + } + } + + @Override + public synchronized Iterator iterator() { + ensureOpen(); + final Iterator iterator = nodes.values().iterator(); + return new Iterator() { + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Client next() { + return iterator.next().client(random); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(""); + } + + }; + } + } diff --git a/src/test/java/org/elasticsearch/validate/SimpleValidateQueryTests.java b/src/test/java/org/elasticsearch/validate/SimpleValidateQueryTests.java index 1236a457f67..5096c824602 100644 --- a/src/test/java/org/elasticsearch/validate/SimpleValidateQueryTests.java +++ b/src/test/java/org/elasticsearch/validate/SimpleValidateQueryTests.java @@ -203,7 +203,7 @@ public class SimpleValidateQueryTests extends AbstractIntegrationTest { - for (Client client : cluster().clients()) { + for (Client client : cluster()) { ValidateQueryResponse response = client.admin().indices().prepareValidateQuery("test") .setQuery("foo".getBytes(Charsets.UTF_8)) .setExplain(true) @@ -215,7 +215,7 @@ public class SimpleValidateQueryTests extends AbstractIntegrationTest { } - for (Client client : cluster().clients()) { + for (Client client : cluster()) { ValidateQueryResponse response = client.admin().indices().prepareValidateQuery("test") .setQuery(QueryBuilders.queryString("foo")) .setExplain(true)