diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index f6870cc05b6..7794c58ddd3 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -19,30 +19,9 @@ package org.elasticsearch.discovery.zen; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; - import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; @@ -53,6 +32,8 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -75,6 +56,25 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -186,9 +186,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { } @Override - public void close() throws IOException { + public void close() { ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS); - IOUtils.close(receivedResponses.values()); + Releasables.close(receivedResponses.values()); closed = true; } @@ -272,7 +272,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { } } - class SendPingsHandler implements Closeable { + class SendPingsHandler implements Releasable { private final int id; private final Set nodeToDisconnect = ConcurrentCollections.newConcurrentSet(); private final PingCollection pingCollection; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index f9a16243e00..272a75f4e7a 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -240,6 +241,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover joinThreadControl.stop(); masterFD.stop("zen disco stop"); nodesFD.stop(); + Releasables.close(zenPing); // stop any ongoing pinging DiscoveryNodes nodes = nodes(); if (sendLeaveRequest) { if (nodes.getMasterNode() == null) { @@ -269,7 +271,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover @Override protected void doClose() throws IOException { - IOUtils.close(masterFD, nodesFD, zenPing); + IOUtils.close(masterFD, nodesFD); } @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java index cb2c8cb5019..75ea701dc99 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java @@ -19,7 +19,15 @@ package org.elasticsearch.discovery.zen; -import java.io.Closeable; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.unit.TimeValue; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -28,17 +36,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.unit.TimeValue; - import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; -public interface ZenPing extends Closeable { +public interface ZenPing extends Releasable { void start(PingContextProvider contextProvider); diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 9eb7f9a0376..298d6712ff0 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -75,9 +75,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.UnicastHostsProvider; -import org.elasticsearch.discovery.zen.UnicastZenPing; -import org.elasticsearch.discovery.zen.ZenPing; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.GatewayAllocator; @@ -655,11 +652,13 @@ public class Node implements Closeable { injector.getInstance(SnapshotShardsService.class).stop(); // stop any changes happening as a result of cluster state changes injector.getInstance(IndicesClusterStateService.class).stop(); + // close discovery early to not react to pings anymore. + // This can confuse other nodes and delay things - mostly if we're the master and we're running tests. + injector.getInstance(Discovery.class).stop(); // we close indices first, so operations won't be allowed on it injector.getInstance(IndicesTTLService.class).stop(); injector.getInstance(RoutingService.class).stop(); injector.getInstance(ClusterService.class).stop(); - injector.getInstance(Discovery.class).stop(); injector.getInstance(NodeConnectionsService.class).stop(); injector.getInstance(MonitorService.class).stop(); injector.getInstance(GatewayService.class).stop(); diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java index e289d90c7a8..b5ef7a642ad 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/exists/IndicesExistsIT.java @@ -31,7 +31,8 @@ import java.io.IOException; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; -@ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) +@ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0, + autoMinMasterNodes = false) public class IndicesExistsIT extends ESIntegTestCase { public void testIndexExistsWithBlocksInPlace() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java index 96ba5729cb8..7c764ed1724 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -43,7 +43,7 @@ import java.util.concurrent.CyclicBarrier; import static org.hamcrest.Matchers.equalTo; -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) public class IndexingMasterFailoverIT extends ESIntegTestCase { @Override diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index 3e58291d4ad..257b80663a3 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -63,7 +63,7 @@ import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; -@ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) @TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.discovery.zen:TRACE") public class MinimumMasterNodesIT extends ESIntegTestCase { @@ -275,12 +275,14 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { .put("discovery.initial_state_timeout", "500ms") .build(); - logger.info("--> start 2 nodes"); - internalCluster().startNodesAsync(2, settings).get(); + logger.info("--> start first node and wait for it to be a master"); + internalCluster().startNode(settings); + ensureClusterSizeConsistency(); // wait until second node join the cluster - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + logger.info("--> start second node and wait for it to join"); + internalCluster().startNode(settings); + ensureClusterSizeConsistency(); logger.info("--> setting minimum master node to 2"); setMinimumMasterNodes(2); @@ -298,8 +300,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { logger.info("--> bringing another node up"); internalCluster().startNode(Settings.builder().put(settings).put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2).build()); - clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + ensureClusterSizeConsistency(); } private void assertNoMasterBlockOnAllNodes() throws InterruptedException { diff --git a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java index f73043ce4e4..0be4f7c4e51 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java @@ -49,7 +49,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; -@ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) public class NoMasterNodeIT extends ESIntegTestCase { @Override diff --git a/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java b/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java index f47134a45b8..ab3f82fff75 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java @@ -69,7 +69,10 @@ public class AckClusterUpdateSettingsIT extends ESIntegTestCase { private void removePublishTimeout() { //to test that the acknowledgement mechanism is working we better disable the wait for publish //otherwise the operation is most likely acknowledged even if it doesn't support ack - assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0"))); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0") + .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") + )); } public void testClusterUpdateSettingsAcknowledgement() { diff --git a/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java b/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java index 2ea4f755156..f51a4f11ae1 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java @@ -36,7 +36,6 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; -import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.index.Index; @@ -61,7 +60,9 @@ public class AckIT extends ESIntegTestCase { //to test that the acknowledgement mechanism is working we better disable the wait for publish //otherwise the operation is most likely acknowledged even if it doesn't support ack return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), 0).build(); + .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout + .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit to check acking + .build(); } public void testUpdateSettingsAcknowledgement() { diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java index d98f9294243..19657b05480 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationD import org.elasticsearch.common.Priority; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; @@ -104,7 +103,6 @@ public class AwarenessAllocationIT extends ESIntegTestCase { Settings commonSettings = Settings.builder() .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b") .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") - .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 3) .put(ZenDiscovery.JOIN_TIMEOUT_SETTING.getKey(), "10s") .build(); diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index 3d345f24dbe..f056eded34b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -20,32 +20,20 @@ package org.elasticsearch.cluster.service; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Priority; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.Singleton; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -56,17 +44,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; @ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class ClusterServiceIT extends ESIntegTestCase { - @Override - protected Collection> nodePlugins() { - return Arrays.asList(TestPlugin.class); - } - public void testAckedUpdateTask() throws Exception { internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); @@ -482,141 +463,4 @@ public class ClusterServiceIT extends ESIntegTestCase { assertTrue(controlSources.isEmpty()); block2.countDown(); } - - public void testLocalNodeMasterListenerCallbacks() throws Exception { - Settings settings = Settings.builder() - .put("discovery.zen.minimum_master_nodes", 1) - .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "400ms") - .put("discovery.initial_state_timeout", "500ms") - .build(); - - String node_0 = internalCluster().startNode(settings); - ClusterService clusterService = internalCluster().getInstance(ClusterService.class); - MasterAwareService testService = internalCluster().getInstance(MasterAwareService.class); - - ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("1").get(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - - // the first node should be a master as the minimum required is 1 - assertThat(clusterService.state().nodes().getMasterNode(), notNullValue()); - assertThat(clusterService.state().nodes().isLocalNodeElectedMaster(), is(true)); - assertThat(testService.master(), is(true)); - String node_1 = internalCluster().startNode(settings); - final ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class, node_1); - MasterAwareService testService1 = internalCluster().getInstance(MasterAwareService.class, node_1); - - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - - // the second node should not be the master as node1 is already the master. - assertThat(clusterService1.state().nodes().isLocalNodeElectedMaster(), is(false)); - assertThat(testService1.master(), is(false)); - - internalCluster().stopCurrentMasterNode(); - clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("1").get(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - - // now that node0 is closed, node1 should be elected as master - assertThat(clusterService1.state().nodes().isLocalNodeElectedMaster(), is(true)); - assertThat(testService1.master(), is(true)); - - // start another node and set min_master_node - internalCluster().startNode(Settings.builder().put(settings)); - assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); - - Settings transientSettings = Settings.builder() - .put("discovery.zen.minimum_master_nodes", 2) - .build(); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(transientSettings).get(); - - // and shutdown the second node - internalCluster().stopRandomNonMasterNode(); - - // there should not be any master as the minimum number of required eligible masters is not met - awaitBusy(() -> clusterService1.state().nodes().getMasterNode() == null && - clusterService1.clusterServiceState().getClusterStateStatus() == ClusterStateStatus.APPLIED); - assertThat(testService1.master(), is(false)); - - // bring the node back up - String node_2 = internalCluster().startNode(Settings.builder().put(settings).put(transientSettings)); - ClusterService clusterService2 = internalCluster().getInstance(ClusterService.class, node_2); - MasterAwareService testService2 = internalCluster().getInstance(MasterAwareService.class, node_2); - - // make sure both nodes see each other otherwise the masternode below could be null if node 2 is master and node 1 did'r receive - // the updated cluster state... - assertThat(internalCluster().client(node_1).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true) - .setWaitForNodes("2").get().isTimedOut(), is(false)); - assertThat(internalCluster().client(node_2).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true) - .setWaitForNodes("2").get().isTimedOut(), is(false)); - - // now that we started node1 again, a new master should be elected - assertThat(clusterService2.state().nodes().getMasterNode(), is(notNullValue())); - if (node_2.equals(clusterService2.state().nodes().getMasterNode().getName())) { - assertThat(testService1.master(), is(false)); - assertThat(testService2.master(), is(true)); - } else { - assertThat(testService1.master(), is(true)); - assertThat(testService2.master(), is(false)); - } - } - - public static class TestPlugin extends Plugin { - - @Override - public Collection> getGuiceServiceClasses() { - List> services = new ArrayList<>(1); - services.add(MasterAwareService.class); - return services; - } - } - - @Singleton - public static class MasterAwareService extends AbstractLifecycleComponent implements LocalNodeMasterListener { - - private final ClusterService clusterService; - private volatile boolean master; - - @Inject - public MasterAwareService(Settings settings, ClusterService clusterService) { - super(settings); - clusterService.add(this); - this.clusterService = clusterService; - logger.info("initialized test service"); - } - - @Override - public void onMaster() { - logger.info("on master [{}]", clusterService.localNode()); - master = true; - } - - @Override - public void offMaster() { - logger.info("off master [{}]", clusterService.localNode()); - master = false; - } - - public boolean master() { - return master; - } - - @Override - protected void doStart() { - } - - @Override - protected void doStop() { - } - - @Override - protected void doClose() { - } - - @Override - public String executorName() { - return ThreadPool.Names.SAME; - } - - } } diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java index 9fd4fc18514..bede01ed21b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -44,6 +45,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -1098,6 +1100,48 @@ public class ClusterServiceTests extends ESTestCase { timedClusterService.close(); } + public void testLocalNodeMasterListenerCallbacks() throws Exception { + TimedClusterService timedClusterService = createTimedClusterService(false); + + AtomicBoolean isMaster = new AtomicBoolean(); + timedClusterService.add(new LocalNodeMasterListener() { + @Override + public void onMaster() { + isMaster.set(true); + } + + @Override + public void offMaster() { + isMaster.set(false); + } + + @Override + public String executorName() { + return ThreadPool.Names.SAME; + } + }); + + ClusterState state = timedClusterService.state(); + DiscoveryNodes nodes = state.nodes(); + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(nodes.getLocalNodeId()); + state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build(); + setState(timedClusterService, state); + assertThat(isMaster.get(), is(true)); + + nodes = state.nodes(); + nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(null); + state = ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_WRITES)) + .nodes(nodesBuilder).build(); + setState(timedClusterService, state); + assertThat(isMaster.get(), is(false)); + nodesBuilder = DiscoveryNodes.builder(nodes).masterNodeId(nodes.getLocalNodeId()); + state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build(); + setState(timedClusterService, state); + assertThat(isMaster.get(), is(true)); + + timedClusterService.close(); + } + private static class SimpleTask { private final int id; diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 22844e05881..1688157c0d1 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -122,7 +122,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; -@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false) @TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE") public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java index 3af2e32eefa..b708ab4c26a 100644 --- a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java @@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException; import static org.hamcrest.Matchers.equalTo; -@ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) public class ZenUnicastDiscoveryIT extends ESIntegTestCase { private ClusterDiscoveryConfiguration discoveryConfig; diff --git a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java index 226b1422b4d..8284388d2ce 100644 --- a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; @@ -37,7 +36,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; -@ClusterScope(numDataNodes =0, scope= Scope.TEST) +@ClusterScope(numDataNodes = 0, scope = Scope.TEST) public class QuorumGatewayIT extends ESIntegTestCase { @Override protected int numberOfReplicas() { @@ -47,8 +46,7 @@ public class QuorumGatewayIT extends ESIntegTestCase { public void testQuorumRecovery() throws Exception { logger.info("--> starting 3 nodes"); // we are shutting down nodes - make sure we don't have 2 clusters if we test network - internalCluster().startNodesAsync(3, - Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2).build()).get(); + internalCluster().startNodesAsync(3).get(); createIndex("test"); diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java index 1e35bcdd469..1794aa22728 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java @@ -34,7 +34,7 @@ import java.util.Set; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; -@ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) public class RecoverAfterNodesIT extends ESIntegTestCase { private static final TimeValue BLOCK_WAIT_TIMEOUT = TimeValue.timeValueSeconds(10); diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 4e1ff3f8678..052bfc00ef2 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -23,7 +23,6 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -44,6 +43,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster.RestartCallback; import org.elasticsearch.test.store.MockFSDirectoryService; import org.elasticsearch.test.store.MockFSIndexStore; @@ -333,48 +333,43 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { String metaDataUuid = client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID(); assertThat(metaDataUuid, not(equalTo("_na_"))); - Map primaryTerms = assertAndCapturePrimaryTerms(null); - logger.info("--> closing first node, and indexing more data to the second node"); - internalCluster().fullRestart(new RestartCallback() { + internalCluster().stopRandomDataNode(); - @Override - public void doAfterNodes(int numNodes, Client client) throws Exception { - if (numNodes == 1) { - logger.info("--> one node is closed - start indexing data into the second one"); - client.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet(); - // TODO: remove once refresh doesn't fail immediately if there a master block: - // https://github.com/elastic/elasticsearch/issues/9997 - client.admin().cluster().prepareHealth("test").setWaitForYellowStatus().get(); - client.admin().indices().prepareRefresh().execute().actionGet(); + logger.info("--> one node is closed - start indexing data into the second one"); + client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet(); + // TODO: remove once refresh doesn't fail immediately if there a master block: + // https://github.com/elastic/elasticsearch/issues/9997 + client().admin().cluster().prepareHealth("test").setWaitForYellowStatus().get(); + client().admin().indices().prepareRefresh().execute().actionGet(); - for (int i = 0; i < 10; i++) { - assertHitCount(client.prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 3); - } + for (int i = 0; i < 10; i++) { + assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 3); + } - logger.info("--> add some metadata, additional type and template"); - client.admin().indices().preparePutMapping("test").setType("type2") - .setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject()) - .execute().actionGet(); - client.admin().indices().preparePutTemplate("template_1") - .setPatterns(Collections.singletonList("te*")) - .setOrder(0) - .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties") - .startObject("field1").field("type", "text").field("store", true).endObject() - .startObject("field2").field("type", "keyword").field("store", true).endObject() - .endObject().endObject().endObject()) - .execute().actionGet(); - client.admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet(); - logger.info("--> starting two nodes back, verifying we got the latest version"); - } + logger.info("--> add some metadata, additional type and template"); + client().admin().indices().preparePutMapping("test").setType("type2") + .setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject()) + .execute().actionGet(); + client().admin().indices().preparePutTemplate("template_1") + .setTemplate("te*") + .setOrder(0) + .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties") + .startObject("field1").field("type", "text").field("store", true).endObject() + .startObject("field2").field("type", "keyword").field("store", true).endObject() + .endObject().endObject().endObject()) + .execute().actionGet(); + client().admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet(); - } + logger.info("--> stopping the second node"); + internalCluster().stopRandomDataNode(); - }); + logger.info("--> starting the two nodes back"); + + internalCluster().startNodesAsync(2, Settings.builder().put("gateway.recover_after_nodes", 2).build()).get(); logger.info("--> running cluster_health (wait for the shards to startup)"); ensureGreen(); - primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); assertThat(client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID(), equalTo(metaDataUuid)); @@ -502,27 +497,28 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { public void testRecoveryDifferentNodeOrderStartup() throws Exception { // we need different data paths so we make sure we start the second node fresh - final String node_1 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()).build()); + final Path pathNode1 = createTempDir(); + final String node_1 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode1).build()); client().prepareIndex("test", "type1", "1").setSource("field", "value").execute().actionGet(); - internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()).build()); + final Path pathNode2 = createTempDir(); + final String node_2 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build()); ensureGreen(); Map primaryTerms = assertAndCapturePrimaryTerms(null); - - internalCluster().fullRestart(new RestartCallback() { - - @Override - public boolean doRestart(String nodeName) { - return !node_1.equals(nodeName); - } - }); - + if (randomBoolean()) { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_1)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2)); + } else { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_2)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_1)); + } + // start the second node again + internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build()); ensureYellow(); primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); - assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index dfe9a09fb2a..e0a6111833b 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -555,10 +555,8 @@ public class IndexRecoveryIT extends ESIntegTestCase { // start a master node internalCluster().startNode(nodeSettings); - InternalTestCluster.Async blueFuture = internalCluster().startNodeAsync(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()); - InternalTestCluster.Async redFuture = internalCluster().startNodeAsync(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build()); - final String blueNodeName = blueFuture.get(); - final String redNodeName = redFuture.get(); + final String blueNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()); + final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build()); ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get(); assertThat(response.isTimedOut(), is(false)); diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index c55fc514332..fe3b569755d 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -209,7 +209,10 @@ public class RareClusterStateIT extends ESIntegTestCase { // but the change might not be on the node that performed the indexing // operation yet - Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0ms").build(); + Settings settings = Settings.builder() + .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout + .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design + .build(); final List nodeNames = internalCluster().startNodesAsync(2, settings).get(); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); @@ -327,7 +330,6 @@ public class RareClusterStateIT extends ESIntegTestCase { // time of indexing it final List nodeNames = internalCluster().startNodesAsync(2, Settings.builder() - .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2) .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design .build()).get(); diff --git a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java index 4a61bebd4db..dc388677050 100644 --- a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java @@ -72,16 +72,15 @@ public class FullRollingRestartIT extends ESIntegTestCase { } logger.info("--> now start adding nodes"); - internalCluster().startNodesAsync(2, settings).get(); + internalCluster().startNode(settings); + internalCluster().startNode(settings); // make sure the cluster state is green, and all has been recovered assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("3")); logger.info("--> add two more nodes"); - internalCluster().startNodesAsync(2, settings).get(); - - // We now have 5 nodes - setMinimumMasterNodes(3); + internalCluster().startNode(settings); + internalCluster().startNode(settings); // make sure the cluster state is green, and all has been recovered assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("5")); @@ -97,9 +96,6 @@ public class FullRollingRestartIT extends ESIntegTestCase { // make sure the cluster state is green, and all has been recovered assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("4")); - // going down to 3 nodes. note that the min_master_node may not be in effect when we shutdown the 4th - // node, but that's OK as it is set to 3 before. - setMinimumMasterNodes(2); internalCluster().stopRandomDataNode(); // make sure the cluster state is green, and all has been recovered assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("3")); @@ -115,8 +111,6 @@ public class FullRollingRestartIT extends ESIntegTestCase { // make sure the cluster state is green, and all has been recovered assertTimeout(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout(healthTimeout).setWaitForGreenStatus().setWaitForNoRelocatingShards(true).setWaitForNodes("2")); - // closing the 2nd node - setMinimumMasterNodes(1); internalCluster().stopRandomDataNode(); // make sure the cluster state is yellow, and all has been recovered diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 8a59b7460b3..bd474280e4e 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -44,8 +44,8 @@ import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; +import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; @@ -53,7 +53,6 @@ import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; -import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.MockIndexEventListener; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; @@ -77,6 +76,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -351,7 +351,8 @@ public class RelocationIT extends ESIntegTestCase { client().admin().indices().prepareCreate(indexName) .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).get(); - internalCluster().startNodesAsync(2).get(); + internalCluster().startNode(); + internalCluster().startNode(); List requests = new ArrayList<>(); int numDocs = scaledRandomIntBetween(25, 250); @@ -424,14 +425,15 @@ public class RelocationIT extends ESIntegTestCase { public void testIndexAndRelocateConcurrently() throws ExecutionException, InterruptedException { int halfNodes = randomIntBetween(1, 3); - Settings blueSetting = Settings.builder().put("node.attr.color", "blue").build(); - InternalTestCluster.Async> blueFuture = internalCluster().startNodesAsync(halfNodes, blueSetting); - Settings redSetting = Settings.builder().put("node.attr.color", "red").build(); - InternalTestCluster.Async> redFuture = internalCluster().startNodesAsync(halfNodes, redSetting); - blueFuture.get(); - redFuture.get(); - logger.info("blue nodes: {}", blueFuture.get()); - logger.info("red nodes: {}", redFuture.get()); + Settings[] nodeSettings = Stream.concat( + Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes), + Stream.generate(() -> Settings.builder().put("node.attr.color", "red").build()).limit(halfNodes) + ).toArray(Settings[]::new); + List nodes = internalCluster().startNodesAsync(nodeSettings).get(); + String[] blueNodes = nodes.subList(0, halfNodes).stream().toArray(String[]::new); + String[] redNodes = nodes.subList(halfNodes, nodes.size()).stream().toArray(String[]::new); + logger.info("blue nodes: {}", (Object)blueNodes); + logger.info("red nodes: {}", (Object)redNodes); ensureStableCluster(halfNodes * 2); assertAcked(prepareCreate("test").setSettings(Settings.builder() @@ -439,7 +441,7 @@ public class RelocationIT extends ESIntegTestCase { .put(indexSettings()) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1)) )); - assertAllShardsOnNodes("test", redFuture.get().toArray(new String[2])); + assertAllShardsOnNodes("test", redNodes); int numDocs = randomIntBetween(100, 150); ArrayList ids = new ArrayList<>(); logger.info(" --> indexing [{}] docs", numDocs); diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 6121b2c0c86..cf4fe03893f 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -19,18 +19,6 @@ package org.elasticsearch.tribe; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.client.Client; @@ -58,6 +46,18 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + import static java.util.stream.Collectors.toSet; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -121,13 +121,13 @@ public class TribeIT extends ESIntegTestCase { final Collection> plugins = nodePlugins(); if (cluster1 == null) { - cluster1 = new InternalTestCluster(randomLong(), createTempDir(), true, minNumDataNodes, maxNumDataNodes, + cluster1 = new InternalTestCluster(randomLong(), createTempDir(), true, true, minNumDataNodes, maxNumDataNodes, UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_1", plugins, Function.identity()); } if (cluster2 == null) { - cluster2 = new InternalTestCluster(randomLong(), createTempDir(), true, minNumDataNodes, maxNumDataNodes, + cluster2 = new InternalTestCluster(randomLong(), createTempDir(), true, true, minNumDataNodes, maxNumDataNodes, UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_2", plugins, Function.identity()); } diff --git a/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java b/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java index 72e1f2da791..f8b105884bf 100644 --- a/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java +++ b/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureMinimumMasterNodesTests.java @@ -39,7 +39,8 @@ import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0, transportClientRatio = 0.0, - numClientNodes = 0) + numClientNodes = 0, + autoMinMasterNodes = false) @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-azure/issues/89") public class AzureMinimumMasterNodesTests extends AbstractAzureComputeServiceTestCase { diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java index cdb45bfe087..41f5af48eb4 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryUpdateSettingsTests.java @@ -33,7 +33,7 @@ import static org.hamcrest.CoreMatchers.is; * starting. * This test requires AWS to run. */ -@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0, autoMinMasterNodes = false) public class Ec2DiscoveryUpdateSettingsTests extends AbstractAwsTestCase { public void testMinimumMasterNodesStart() { Settings nodeSettings = Settings.builder() diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 82e7ce072e0..29f77bc166a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -149,6 +149,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.List; @@ -178,6 +179,7 @@ import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgno import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; @@ -527,10 +529,15 @@ public abstract class ESIntegTestCase extends ESTestCase { if (cluster() != null) { if (currentClusterScope != Scope.TEST) { MetaData metaData = client().admin().cluster().prepareState().execute().actionGet().getState().getMetaData(); - assertThat("test leaves persistent cluster metadata behind: " + metaData.persistentSettings().getAsMap(), metaData - .persistentSettings().getAsMap().size(), equalTo(0)); - assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().getAsMap(), metaData - .transientSettings().getAsMap().size(), equalTo(0)); + final Map persistent = metaData.persistentSettings().getAsMap(); + assertThat("test leaves persistent cluster metadata behind: " + persistent, persistent.size(), equalTo(0)); + final Map transientSettings = new HashMap<>(metaData.transientSettings().getAsMap()); + if (isInternalCluster() && internalCluster().getAutoManageMinMasterNode()) { + // this is set by the test infra + transientSettings.remove(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()); + } + assertThat("test leaves transient cluster metadata behind: " + transientSettings, + transientSettings.keySet(), empty()); } ensureClusterSizeConsistency(); ensureClusterStateConsistency(); @@ -1518,6 +1525,12 @@ public abstract class ESIntegTestCase extends ESTestCase { */ boolean supportsDedicatedMasters() default true; + /** + * The cluster automatically manages the {@link ElectMasterService#DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING} by default + * as nodes are started and stopped. Set this to false to manage the setting manually. + */ + boolean autoMinMasterNodes() default true; + /** * Returns the number of client nodes in the cluster. Default is {@link InternalTestCluster#DEFAULT_NUM_CLIENT_NODES}, a * negative value means that the number of client nodes will be randomized. @@ -1615,6 +1628,11 @@ public abstract class ESIntegTestCase extends ESTestCase { return annotation == null ? true : annotation.supportsDedicatedMasters(); } + private boolean getAutoMinMasterNodes() { + ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class); + return annotation == null ? true : annotation.autoMinMasterNodes(); + } + private int getNumDataNodes() { ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class); return annotation == null ? -1 : annotation.numDataNodes(); @@ -1753,7 +1771,8 @@ public abstract class ESIntegTestCase extends ESTestCase { } mockPlugins = mocks; } - return new InternalTestCluster(seed, createTempDir(), supportsDedicatedMasters, minNumDataNodes, maxNumDataNodes, + return new InternalTestCluster(seed, createTempDir(), supportsDedicatedMasters, getAutoMinMasterNodes(), + minNumDataNodes, maxNumDataNodes, InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(), InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 69c59086980..1d3b24b3ce2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -444,7 +444,6 @@ public abstract class ESTestCase extends LuceneTestCase { return RandomPicks.randomFrom(random, array); } - /** Pick a random object from the given list. */ public static T randomFrom(List list) { return RandomPicks.randomFrom(random(), list); @@ -452,7 +451,12 @@ public abstract class ESTestCase extends LuceneTestCase { /** Pick a random object from the given collection. */ public static T randomFrom(Collection collection) { - return RandomPicks.randomFrom(random(), collection); + return randomFrom(random(), collection); + } + + /** Pick a random object from the given collection. */ + public static T randomFrom(Random random, Collection collection) { + return RandomPicks.randomFrom(random, collection); } public static String randomAsciiOfLengthBetween(int minCodeUnits, int maxCodeUnits) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 37e3a58295e..3c29b878e7a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -24,12 +24,10 @@ import com.carrotsearch.randomizedtesting.SysGlobals; import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; - import org.apache.logging.log4j.Logger; import org.apache.lucene.store.StoreRateLimiting; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; @@ -68,7 +66,8 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.zen.ElectMasterService; +import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLockObtainFailedException; @@ -133,8 +132,10 @@ import java.util.stream.Stream; import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY; import static org.apache.lucene.util.LuceneTestCase.rarely; +import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING; import static org.elasticsearch.test.ESTestCase.assertBusy; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; +import static org.elasticsearch.test.ESTestCase.randomFrom; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -225,6 +226,8 @@ public final class InternalTestCluster extends TestCluster { private final ExecutorService executor; + private final boolean autoManageMinMasterNodes; + private final Collection> mockPlugins; /** @@ -238,9 +241,10 @@ public final class InternalTestCluster extends TestCluster { public InternalTestCluster(long clusterSeed, Path baseDir, boolean randomlyAddDedicatedMasters, - int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes, + boolean autoManageMinMasterNodes, int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes, boolean enableHttpPipelining, String nodePrefix, Collection> mockPlugins, Function clientWrapper) { super(clusterSeed); + this.autoManageMinMasterNodes = autoManageMinMasterNodes; this.clientWrapper = clientWrapper; this.baseDir = baseDir; this.clusterName = clusterName; @@ -345,6 +349,11 @@ public final class InternalTestCluster extends TestCluster { return clusterName; } + /** returns true if the {@link ElectMasterService#DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING} setting is auto managed by this cluster */ + public boolean getAutoManageMinMasterNode() { + return autoManageMinMasterNodes; + } + public String[] getNodeNames() { return nodes.keySet().toArray(Strings.EMPTY_ARRAY); } @@ -466,7 +475,7 @@ public final class InternalTestCluster extends TestCluster { if (randomNodeAndClient != null) { return randomNodeAndClient; } - NodeAndClient buildNode = buildNode(); + NodeAndClient buildNode = buildNode(1); buildNode.startNode(); publishNode(buildNode); return buildNode; @@ -496,30 +505,20 @@ public final class InternalTestCluster extends TestCluster { * if more nodes than n are present this method will not * stop any of the running nodes. */ - public void ensureAtLeastNumDataNodes(int n) { - final List> asyncs = new ArrayList<>(); - synchronized (this) { - int size = numDataNodes(); - for (int i = size; i < n; i++) { - logger.info("increasing cluster size from {} to {}", size, n); - if (numSharedDedicatedMasterNodes > 0) { - asyncs.add(startDataOnlyNodeAsync()); - } else { - asyncs.add(startNodeAsync()); - } + public synchronized void ensureAtLeastNumDataNodes(int n) { + boolean added = false; + int size = numDataNodes(); + for (int i = size; i < n; i++) { + logger.info("increasing cluster size from {} to {}", size, n); + added = true; + if (numSharedDedicatedMasterNodes > 0) { + startDataOnlyNode(Settings.EMPTY); + } else { + startNode(Settings.EMPTY); } } - try { - for (Async async : asyncs) { - async.get(); - } - } catch (Exception e) { - throw new ElasticsearchException("failed to start nodes", e); - } - if (!asyncs.isEmpty()) { - synchronized (this) { - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(nodes.size())).get()); - } + if (added) { + validateClusterFormed(); } } @@ -544,28 +543,47 @@ public final class InternalTestCluster extends TestCluster { while (values.hasNext() && numNodesAndClients++ < size - n) { NodeAndClient next = values.next(); nodesToRemove.add(next); - removeDisruptionSchemeFromNode(next); - next.close(); - } - for (NodeAndClient toRemove : nodesToRemove) { - nodes.remove(toRemove.name); } + + stopNodesAndClients(nodesToRemove); if (!nodesToRemove.isEmpty() && size() > 0) { - assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(nodes.size())).get()); + validateClusterFormed(); } } - private NodeAndClient buildNode(Settings settings) { + /** + * builds a new node given the settings. + * + * @param settings the settings to use + * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed + */ + private NodeAndClient buildNode(Settings settings, int defaultMinMasterNodes) { int ord = nextNodeId.getAndIncrement(); - return buildNode(ord, random.nextLong(), settings, false); + return buildNode(ord, random.nextLong(), settings, false, defaultMinMasterNodes); } - private NodeAndClient buildNode() { + /** + * builds a new node with default settings + * + * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed + */ + private NodeAndClient buildNode(int defaultMinMasterNodes) { int ord = nextNodeId.getAndIncrement(); - return buildNode(ord, random.nextLong(), null, false); + return buildNode(ord, random.nextLong(), null, false, defaultMinMasterNodes); } - private NodeAndClient buildNode(int nodeId, long seed, Settings settings, boolean reuseExisting) { + /** + * builds a new node + * + * @param nodeId the node internal id (see {@link NodeAndClient#nodeAndClientId()} + * @param seed the node's random seed + * @param settings the settings to use + * @param reuseExisting if a node with the same name is already part of {@link #nodes}, no new node will be built and + * the method will return the existing one + * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed + */ + private NodeAndClient buildNode(int nodeId, long seed, Settings settings, + boolean reuseExisting, int defaultMinMasterNodes) { assert Thread.holdsLock(this); ensureOpen(); settings = getSettings(nodeId, seed, settings); @@ -577,13 +595,21 @@ public final class InternalTestCluster extends TestCluster { assert reuseExisting == true || nodes.containsKey(name) == false : "node name [" + name + "] already exists but not allowed to use it"; } - Settings finalSettings = Settings.builder() + Settings.Builder finalSettings = Settings.builder() .put(Environment.PATH_HOME_SETTING.getKey(), baseDir) // allow overriding path.home .put(settings) .put("node.name", name) - .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), seed) - .build(); - MockNode node = new MockNode(finalSettings, plugins); + .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), seed); + + if (autoManageMinMasterNodes) { + assert finalSettings.get(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) == null : + "min master nodes may not be set when auto managed"; + finalSettings + // don't wait too long not to slow down tests + .put(ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING.getKey(), "5s") + .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), defaultMinMasterNodes); + } + MockNode node = new MockNode(finalSettings.build(), plugins); return new NodeAndClient(name, node, nodeId); } @@ -684,7 +710,7 @@ public final class InternalTestCluster extends TestCluster { .put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false); if (size() == 0) { // if we are the first node - don't wait for a state - builder.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0); + builder.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0); } return startNode(builder); } @@ -777,6 +803,10 @@ public final class InternalTestCluster extends TestCluster { return nodeAndClientId; } + public boolean isMasterEligible() { + return Node.NODE_MASTER_SETTING.get(node.settings()); + } + Client client(Random random) { if (closed.get()) { throw new RuntimeException("already closed"); @@ -844,21 +874,40 @@ public final class InternalTestCluster extends TestCluster { node.close(); } - void restart(RestartCallback callback, boolean clearDataIfNeeded) throws Exception { - assert callback != null; - resetClient(); + /** + * closes the current node if not already closed, builds a new node object using the current node settings and starts it + */ + void restart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception { if (!node.isClosed()) { closeNode(); } - Settings newSettings = callback.onNodeStopped(name); - if (newSettings == null) { - newSettings = Settings.EMPTY; + recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes); + startNode(); + } + + /** + * rebuilds a new node object using the current node settings and starts it + */ + void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception { + assert callback != null; + Settings callbackSettings = callback.onNodeStopped(name); + Settings.Builder newSettings = Settings.builder(); + if (callbackSettings != null) { + newSettings.put(callbackSettings); } + if (minMasterNodes >= 0) { + assert ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(newSettings.build()) == false : "min master nodes is auto managed"; + newSettings.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minMasterNodes).build(); + } + + // validation is (optionally) done in fullRestart/rollingRestart + newSettings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); if (clearDataIfNeeded) { clearDataIfNeeded(callback); } - createNewNode(newSettings); - startNode(); + createNewNode(newSettings.build()); + // make sure cached client points to new node + resetClient(); } private void clearDataIfNeeded(RestartCallback callback) throws IOException { @@ -948,22 +997,24 @@ public final class InternalTestCluster extends TestCluster { if (wipeData) { wipePendingDataDirectories(); } + if (nodes.size() > 0 && autoManageMinMasterNodes) { + updateMinMasterNodes(getMasterNodesCount()); + } logger.debug("Cluster hasn't changed - moving out - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize); return; } logger.debug("Cluster is NOT consistent - restarting shared nodes - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize); // trash all nodes with id >= sharedNodesSeeds.length - they are non shared - - + final List toClose = new ArrayList<>(); for (Iterator iterator = nodes.values().iterator(); iterator.hasNext();) { NodeAndClient nodeAndClient = iterator.next(); if (nodeAndClient.nodeAndClientId() >= sharedNodesSeeds.length) { logger.debug("Close Node [{}] not shared", nodeAndClient.name); - nodeAndClient.close(); - iterator.remove(); + toClose.add(nodeAndClient); } } + stopNodesAndClients(toClose); // clean up what the nodes left that is unused if (wipeData) { @@ -972,13 +1023,19 @@ public final class InternalTestCluster extends TestCluster { // start any missing node assert newSize == numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes; + final int numberOfMasterNodes = numSharedDedicatedMasterNodes > 0 ? numSharedDedicatedMasterNodes : numSharedDataNodes; + final int defaultMinMasterNodes = (numberOfMasterNodes / 2) + 1; + final List toStartAndPublish = new ArrayList<>(); // we want to start nodes in one go due to min master nodes for (int i = 0; i < numSharedDedicatedMasterNodes; i++) { final Settings.Builder settings = Settings.builder(); - settings.put(Node.NODE_MASTER_SETTING.getKey(), true).build(); - settings.put(Node.NODE_DATA_SETTING.getKey(), false).build(); - NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true); - nodeAndClient.startNode(); - publishNode(nodeAndClient); + settings.put(Node.NODE_MASTER_SETTING.getKey(), true); + settings.put(Node.NODE_DATA_SETTING.getKey(), false); + if (autoManageMinMasterNodes) { + settings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); // we wait at the end + } + + NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); + toStartAndPublish.add(nodeAndClient); } for (int i = numSharedDedicatedMasterNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes; i++) { final Settings.Builder settings = Settings.builder(); @@ -987,32 +1044,43 @@ public final class InternalTestCluster extends TestCluster { settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build(); settings.put(Node.NODE_DATA_SETTING.getKey(), true).build(); } - NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true); - nodeAndClient.startNode(); - publishNode(nodeAndClient); + if (autoManageMinMasterNodes) { + settings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); // we wait at the end + } + NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); + toStartAndPublish.add(nodeAndClient); } for (int i = numSharedDedicatedMasterNodes + numSharedDataNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes; i++) { final Builder settings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false) .put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false); - NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true); - nodeAndClient.startNode(); - publishNode(nodeAndClient); + NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); + toStartAndPublish.add(nodeAndClient); } + startAndPublishNodesAndClients(toStartAndPublish); + nextNodeId.set(newSize); assert size() == newSize; if (newSize > 0) { - ClusterHealthResponse response = client().admin().cluster().prepareHealth() - .setWaitForNodes(Integer.toString(newSize)).get(); - if (response.isTimedOut()) { - logger.warn("failed to wait for a cluster of size [{}], got [{}]", newSize, response); - throw new IllegalStateException("cluster failed to reach the expected size of [" + newSize + "]"); - } + validateClusterFormed(); } logger.debug("Cluster is consistent again - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize); } + /** ensure a cluster is form with {@link #nodes}.size() nodes. */ + private void validateClusterFormed() { + final int size = nodes.size(); + String name = randomFrom(random, getNodeNames()); + logger.trace("validating cluster formed via [{}], expecting [{}]", name, size); + final Client client = client(name); + ClusterHealthResponse response = client.admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(size)).get(); + if (response.isTimedOut()) { + logger.warn("failed to wait for a cluster of size [{}], got [{}]", size, response); + throw new IllegalStateException("cluster failed to reach the expected size of [" + size + "]"); + } + } + @Override public synchronized void afterTest() throws IOException { wipePendingDataDirectories(); @@ -1234,9 +1302,7 @@ public final class InternalTestCluster extends TestCluster { NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate()); if (nodeAndClient != null) { logger.info("Closing random node [{}] ", nodeAndClient.name); - removeDisruptionSchemeFromNode(nodeAndClient); - nodes.remove(nodeAndClient.name); - nodeAndClient.close(); + stopNodesAndClient(nodeAndClient); return true; } return false; @@ -1251,9 +1317,7 @@ public final class InternalTestCluster extends TestCluster { NodeAndClient nodeAndClient = getRandomNodeAndClient(nc -> filter.test(nc.node.settings())); if (nodeAndClient != null) { logger.info("Closing filtered random node [{}] ", nodeAndClient.name); - removeDisruptionSchemeFromNode(nodeAndClient); - nodes.remove(nodeAndClient.name); - nodeAndClient.close(); + stopNodesAndClient(nodeAndClient); } } @@ -1266,9 +1330,7 @@ public final class InternalTestCluster extends TestCluster { String masterNodeName = getMasterName(); assert nodes.containsKey(masterNodeName); logger.info("Closing master node [{}] ", masterNodeName); - removeDisruptionSchemeFromNode(nodes.get(masterNodeName)); - NodeAndClient remove = nodes.remove(masterNodeName); - remove.close(); + stopNodesAndClient(nodes.get(masterNodeName)); } /** @@ -1278,8 +1340,47 @@ public final class InternalTestCluster extends TestCluster { NodeAndClient nodeAndClient = getRandomNodeAndClient(new MasterNodePredicate(getMasterName()).negate()); if (nodeAndClient != null) { logger.info("Closing random non master node [{}] current master [{}] ", nodeAndClient.name, getMasterName()); + stopNodesAndClient(nodeAndClient); + } + } + + private synchronized void startAndPublishNodesAndClients(List nodeAndClients) { + if (nodeAndClients.size() > 0) { + final int newMasters = (int) nodeAndClients.stream().filter(NodeAndClient::isMasterEligible) + .filter(nac -> nodes.containsKey(nac.name) == false) // filter out old masters + .count(); + final int currentMasters = getMasterNodesCount(); + if (autoManageMinMasterNodes && currentMasters > 1 && newMasters > 0) { + // special case for 1 node master - we can't update the min master nodes before we add more nodes. + updateMinMasterNodes(currentMasters + newMasters); + } + for (NodeAndClient nodeAndClient : nodeAndClients) { + nodeAndClient.startNode(); + publishNode(nodeAndClient); + } + if (autoManageMinMasterNodes && currentMasters == 1 && newMasters > 0) { + // update once masters have joined + validateClusterFormed(); + updateMinMasterNodes(currentMasters + newMasters); + } + } + } + + private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException { + stopNodesAndClients(Collections.singleton(nodeAndClient)); + } + + private synchronized void stopNodesAndClients(Collection nodeAndClients) throws IOException { + if (autoManageMinMasterNodes && nodeAndClients.size() > 0) { + int masters = (int)nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count(); + if (masters > 0) { + updateMinMasterNodes(getMasterNodesCount() - masters); + } + } + for (NodeAndClient nodeAndClient: nodeAndClients) { removeDisruptionSchemeFromNode(nodeAndClient); - nodes.remove(nodeAndClient.name); + NodeAndClient previous = nodes.remove(nodeAndClient.name); + assert previous == nodeAndClient; nodeAndClient.close(); } } @@ -1319,8 +1420,7 @@ public final class InternalTestCluster extends TestCluster { ensureOpen(); NodeAndClient nodeAndClient = getRandomNodeAndClient(predicate); if (nodeAndClient != null) { - logger.info("Restarting random node [{}] ", nodeAndClient.name); - nodeAndClient.restart(callback, true); + restartNode(nodeAndClient, callback); } } @@ -1331,93 +1431,10 @@ public final class InternalTestCluster extends TestCluster { ensureOpen(); NodeAndClient nodeAndClient = nodes.get(nodeName); if (nodeAndClient != null) { - logger.info("Restarting node [{}] ", nodeAndClient.name); - nodeAndClient.restart(callback, true); + restartNode(nodeAndClient, callback); } } - private synchronized void restartAllNodes(boolean rollingRestart, RestartCallback callback) throws Exception { - ensureOpen(); - List toRemove = new ArrayList<>(); - try { - for (NodeAndClient nodeAndClient : nodes.values()) { - if (!callback.doRestart(nodeAndClient.name)) { - logger.info("Closing node [{}] during restart", nodeAndClient.name); - toRemove.add(nodeAndClient); - if (activeDisruptionScheme != null) { - activeDisruptionScheme.removeFromNode(nodeAndClient.name, this); - } - nodeAndClient.close(); - } - } - } finally { - for (NodeAndClient nodeAndClient : toRemove) { - nodes.remove(nodeAndClient.name); - } - } - logger.info("Restarting remaining nodes rollingRestart [{}]", rollingRestart); - if (rollingRestart) { - int numNodesRestarted = 0; - for (NodeAndClient nodeAndClient : nodes.values()) { - callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient()); - logger.info("Restarting node [{}] ", nodeAndClient.name); - if (activeDisruptionScheme != null) { - activeDisruptionScheme.removeFromNode(nodeAndClient.name, this); - } - nodeAndClient.restart(callback, true); - if (activeDisruptionScheme != null) { - activeDisruptionScheme.applyToNode(nodeAndClient.name, this); - } - } - } else { - int numNodesRestarted = 0; - Set[] nodesRoleOrder = new Set[nextNodeId.get()]; - Map, List> nodesByRoles = new HashMap<>(); - for (NodeAndClient nodeAndClient : nodes.values()) { - callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient()); - logger.info("Stopping node [{}] ", nodeAndClient.name); - if (activeDisruptionScheme != null) { - activeDisruptionScheme.removeFromNode(nodeAndClient.name, this); - } - nodeAndClient.closeNode(); - // delete data folders now, before we start other nodes that may claim it - nodeAndClient.clearDataIfNeeded(callback); - - DiscoveryNode discoveryNode = getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode(); - nodesRoleOrder[nodeAndClient.nodeAndClientId()] = discoveryNode.getRoles(); - nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient); - } - - assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == nodes.size(); - - // randomize start up order, but making sure that: - // 1) A data folder that was assigned to a data node will stay so - // 2) Data nodes will get the same node lock ordinal range, so custom index paths (where the ordinal is used) - // will still belong to data nodes - for (List sameRoleNodes : nodesByRoles.values()) { - Collections.shuffle(sameRoleNodes, random); - } - - for (Set roles : nodesRoleOrder) { - if (roles == null) { - // if some nodes were stopped, we want have a role for them - continue; - } - NodeAndClient nodeAndClient = nodesByRoles.get(roles).remove(0); - logger.info("Starting node [{}] ", nodeAndClient.name); - if (activeDisruptionScheme != null) { - activeDisruptionScheme.removeFromNode(nodeAndClient.name, this); - } - // we already cleared data folders, before starting nodes up - nodeAndClient.restart(callback, false); - if (activeDisruptionScheme != null) { - activeDisruptionScheme.applyToNode(nodeAndClient.name, this); - } - } - } - } - - public static final RestartCallback EMPTY_CALLBACK = new RestartCallback() { @Override public Settings onNodeStopped(String node) { @@ -1442,15 +1459,98 @@ public final class InternalTestCluster extends TestCluster { /** * Restarts all nodes in a rolling restart fashion ie. only restarts on node a time. */ - public void rollingRestart(RestartCallback function) throws Exception { - restartAllNodes(true, function); + public synchronized void rollingRestart(RestartCallback callback) throws Exception { + int numNodesRestarted = 0; + for (NodeAndClient nodeAndClient : nodes.values()) { + callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient()); + restartNode(nodeAndClient, callback); + } + } + + private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) throws Exception { + logger.info("Restarting node [{}] ", nodeAndClient.name); + if (activeDisruptionScheme != null) { + activeDisruptionScheme.removeFromNode(nodeAndClient.name, this); + } + final int masterNodesCount = getMasterNodesCount(); + // special case to allow stopping one node in a two node cluster and keep it functional + final boolean updateMinMaster = nodeAndClient.isMasterEligible() && masterNodesCount == 2 && autoManageMinMasterNodes; + if (updateMinMaster) { + updateMinMasterNodes(masterNodesCount - 1); + } + nodeAndClient.restart(callback, true, autoManageMinMasterNodes ? getMinMasterNodes(masterNodesCount) : -1); + if (activeDisruptionScheme != null) { + activeDisruptionScheme.applyToNode(nodeAndClient.name, this); + } + if (callback.validateClusterForming() || updateMinMaster) { + // we have to validate cluster size if updateMinMaster == true, because we need the + // second node to join in order to increment min_master_nodes back to 2. + validateClusterFormed(); + } + if (updateMinMaster) { + updateMinMasterNodes(masterNodesCount); + } } /** * Restarts all nodes in the cluster. It first stops all nodes and then restarts all the nodes again. */ - public void fullRestart(RestartCallback function) throws Exception { - restartAllNodes(false, function); + public synchronized void fullRestart(RestartCallback callback) throws Exception { + int numNodesRestarted = 0; + Map, List> nodesByRoles = new HashMap<>(); + Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()]; + for (NodeAndClient nodeAndClient : nodes.values()) { + callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient()); + logger.info("Stopping node [{}] ", nodeAndClient.name); + if (activeDisruptionScheme != null) { + activeDisruptionScheme.removeFromNode(nodeAndClient.name, this); + } + nodeAndClient.closeNode(); + // delete data folders now, before we start other nodes that may claim it + nodeAndClient.clearDataIfNeeded(callback); + DiscoveryNode discoveryNode = getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode(); + rolesOrderedByOriginalStartupOrder[nodeAndClient.nodeAndClientId] = discoveryNode.getRoles(); + nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient); + } + + assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == nodes.size(); + + // randomize start up order, but making sure that: + // 1) A data folder that was assigned to a data node will stay so + // 2) Data nodes will get the same node lock ordinal range, so custom index paths (where the ordinal is used) + // will still belong to data nodes + for (List sameRoleNodes : nodesByRoles.values()) { + Collections.shuffle(sameRoleNodes, random); + } + List startUpOrder = new ArrayList<>(); + for (Set roles : rolesOrderedByOriginalStartupOrder) { + if (roles == null) { + // if some nodes were stopped, we want have a role for that ordinal + continue; + } + final List nodesByRole = nodesByRoles.get(roles); + startUpOrder.add(nodesByRole.remove(0)); + } + assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0; + + // do two rounds to minimize pinging (mock zen pings pings with no delay and can create a lot of logs) + for (NodeAndClient nodeAndClient : startUpOrder) { + logger.info("resetting node [{}] ", nodeAndClient.name); + // we already cleared data folders, before starting nodes up + nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1); + } + + for (NodeAndClient nodeAndClient : startUpOrder) { + logger.info("starting node [{}] ", nodeAndClient.name); + nodeAndClient.startNode(); + if (activeDisruptionScheme != null) { + activeDisruptionScheme.applyToNode(nodeAndClient.name, this); + } + } + + if (callback.validateClusterForming()) { + validateClusterFormed(); + } } @@ -1534,19 +1634,51 @@ public final class InternalTestCluster extends TestCluster { * Starts a node with the given settings and returns it's name. */ public synchronized String startNode(Settings settings) { - NodeAndClient buildNode = buildNode(settings); - buildNode.startNode(); - publishNode(buildNode); + final int defaultMinMasterNodes = getMinMasterNodes(getMasterNodesCount() + (Node.NODE_MASTER_SETTING.get(settings) ? 1 : 0)); + NodeAndClient buildNode = buildNode(settings, defaultMinMasterNodes); + startAndPublishNodesAndClients(Collections.singletonList(buildNode)); return buildNode.name; } + /** + * updates the min master nodes setting in the current running cluster. + * + * @param eligibleMasterNodeCount the number of master eligible nodes to use as basis for the min master node setting + */ + private int updateMinMasterNodes(int eligibleMasterNodeCount) { + assert autoManageMinMasterNodes; + final int minMasterNodes = getMinMasterNodes(eligibleMasterNodeCount); + if (getMasterNodesCount() > 0) { + // there should be at least one master to update + logger.debug("updating min_master_nodes to [{}]", minMasterNodes); + try { + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings( + Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minMasterNodes) + )); + } catch (Exception e) { + throw new ElasticsearchException("failed to update minimum master node to [{}] (current masters [{}])", e, + minMasterNodes, getMasterNodesCount()); + } + } + return minMasterNodes; + } + + /** calculates a min master nodes value based on the given number of master nodes */ + private int getMinMasterNodes(int eligibleMasterNodes) { + return eligibleMasterNodes / 2 + 1; + } + + private int getMasterNodesCount() { + return (int)nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count(); + } + public synchronized Async> startMasterOnlyNodesAsync(int numNodes) { return startMasterOnlyNodesAsync(numNodes, Settings.EMPTY); } public synchronized Async> startMasterOnlyNodesAsync(int numNodes, Settings settings) { Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build(); - return startNodesAsync(numNodes, settings1, Version.CURRENT); + return startNodesAsync(numNodes, settings1); } public synchronized Async> startDataOnlyNodesAsync(int numNodes) { @@ -1555,7 +1687,7 @@ public final class InternalTestCluster extends TestCluster { public synchronized Async> startDataOnlyNodesAsync(int numNodes, Settings settings) { Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build(); - return startNodesAsync(numNodes, settings1, Version.CURRENT); + return startNodesAsync(numNodes, settings1); } public synchronized Async startMasterOnlyNodeAsync() { @@ -1564,7 +1696,7 @@ public final class InternalTestCluster extends TestCluster { public synchronized Async startMasterOnlyNodeAsync(Settings settings) { Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build(); - return startNodeAsync(settings1, Version.CURRENT); + return startNodeAsync(settings1); } public synchronized String startMasterOnlyNode(Settings settings) { @@ -1578,7 +1710,7 @@ public final class InternalTestCluster extends TestCluster { public synchronized Async startDataOnlyNodeAsync(Settings settings) { Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build(); - return startNodeAsync(settings1, Version.CURRENT); + return startNodeAsync(settings1); } public synchronized String startDataOnlyNode(Settings settings) { @@ -1590,21 +1722,25 @@ public final class InternalTestCluster extends TestCluster { * Starts a node in an async manner with the given settings and returns future with its name. */ public synchronized Async startNodeAsync() { - return startNodeAsync(Settings.EMPTY, Version.CURRENT); + return startNodeAsync(Settings.EMPTY); } /** * Starts a node in an async manner with the given settings and returns future with its name. */ public synchronized Async startNodeAsync(final Settings settings) { - return startNodeAsync(settings, Version.CURRENT); + final int defaultMinMasterNodes; + if (autoManageMinMasterNodes) { + int mastersDelta = Node.NODE_MASTER_SETTING.get(settings) ? 1 : 0; + defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta); + } else { + defaultMinMasterNodes = -1; + } + return startNodeAsync(settings, defaultMinMasterNodes); } - /** - * Starts a node in an async manner with the given settings and version and returns future with its name. - */ - public synchronized Async startNodeAsync(final Settings settings, final Version version) { - final NodeAndClient buildNode = buildNode(settings); + private synchronized Async startNodeAsync(final Settings settings, int defaultMinMasterNodes) { + final NodeAndClient buildNode = buildNode(settings, defaultMinMasterNodes); final Future submit = executor.submit(() -> { buildNode.startNode(); publishNode(buildNode); @@ -1613,27 +1749,28 @@ public final class InternalTestCluster extends TestCluster { return () -> submit.get(); } + /** * Starts multiple nodes in an async manner and returns future with its name. */ public synchronized Async> startNodesAsync(final int numNodes) { - return startNodesAsync(numNodes, Settings.EMPTY, Version.CURRENT); + return startNodesAsync(numNodes, Settings.EMPTY); } /** * Starts multiple nodes in an async manner with the given settings and returns future with its name. */ - public synchronized Async> startNodesAsync(final int numNodes, final Settings settings) { - return startNodesAsync(numNodes, settings, Version.CURRENT); - } - - /** - * Starts multiple nodes in an async manner with the given settings and version and returns future with its name. - */ - public synchronized Async> startNodesAsync(final int numNodes, final Settings settings, final Version version) { + public synchronized Async> startNodesAsync(final int numNodes, final Settings settings) { + final int defaultMinMasterNodes; + if (autoManageMinMasterNodes) { + int mastersDelta = Node.NODE_MASTER_SETTING.get(settings) ? numNodes : 0; + defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta); + } else { + defaultMinMasterNodes = -1; + } final List> asyncs = new ArrayList<>(); for (int i = 0; i < numNodes; i++) { - asyncs.add(startNodeAsync(settings, version)); + asyncs.add(startNodeAsync(settings, defaultMinMasterNodes)); } return () -> { @@ -1650,9 +1787,16 @@ public final class InternalTestCluster extends TestCluster { * The order of the node names returned matches the order of the settings provided. */ public synchronized Async> startNodesAsync(final Settings... settings) { + final int defaultMinMasterNodes; + if (autoManageMinMasterNodes) { + int mastersDelta = (int) Stream.of(settings).filter(Node.NODE_MASTER_SETTING::get).count(); + defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta); + } else { + defaultMinMasterNodes = -1; + } List> asyncs = new ArrayList<>(); for (Settings setting : settings) { - asyncs.add(startNodeAsync(setting, Version.CURRENT)); + asyncs.add(startNodeAsync(setting, defaultMinMasterNodes)); } return () -> { List ids = new ArrayList<>(); @@ -1683,6 +1827,11 @@ public final class InternalTestCluster extends TestCluster { return dataAndMasterNodes().size(); } + public synchronized int numMasterNodes() { + return filterNodes(nodes, NodeAndClient::isMasterEligible).size(); + } + + public void setDisruptionScheme(ServiceDisruptionScheme scheme) { clearDisruptionScheme(); scheme.applyToCluster(this); @@ -1887,14 +2036,8 @@ public final class InternalTestCluster extends TestCluster { return false; } - - /** - * If this returns false the node with the given node name will not be restarted. It will be - * closed and removed from the cluster. Returns true by default. - */ - public boolean doRestart(String nodeName) { - return true; - } + /** returns true if the restart should also validate the cluster has reformed */ + public boolean validateClusterForming() { return true; } } public Settings getDefaultSettings() { diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java index c544b2bad88..fe16f034116 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java @@ -18,11 +18,6 @@ */ package org.elasticsearch.test.discovery; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.component.AbstractComponent; @@ -32,13 +27,22 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.zen.PingContextProvider; import org.elasticsearch.discovery.zen.ZenPing; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + /** * A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging * to be immediate and can be used to speed up tests. */ public final class MockZenPing extends AbstractComponent implements ZenPing { - static final Map> activeNodesPerCluster = ConcurrentCollections.newConcurrentMap(); + static final Map> activeNodesPerCluster = new HashMap<>(); + + /** a set of the last discovered pings. used to throttle busy spinning where MockZenPing will keep returning the same results */ + private Set lastDiscoveredPings = null; private volatile PingContextProvider contextProvider; @@ -50,18 +54,34 @@ public final class MockZenPing extends AbstractComponent implements ZenPing { public void start(PingContextProvider contextProvider) { this.contextProvider = contextProvider; assert contextProvider != null; - boolean added = getActiveNodesForCurrentCluster().add(this); - assert added; + synchronized (activeNodesPerCluster) { + boolean added = getActiveNodesForCurrentCluster().add(this); + assert added; + activeNodesPerCluster.notifyAll(); + } } @Override public void ping(PingListener listener, TimeValue timeout) { logger.info("pinging using mock zen ping"); - List responseList = getActiveNodesForCurrentCluster().stream() - .filter(p -> p != this) // remove this as pings are not expected to return the local node - .map(MockZenPing::getPingResponse) - .collect(Collectors.toList()); - listener.onPing(responseList); + synchronized (activeNodesPerCluster) { + Set activeNodes = getActiveNodesForCurrentCluster(); + if (activeNodes.equals(lastDiscoveredPings)) { + try { + logger.trace("nothing has changed since the last ping. waiting for a change"); + activeNodesPerCluster.wait(timeout.millis()); + } catch (InterruptedException e) { + + } + activeNodes = getActiveNodesForCurrentCluster(); + } + lastDiscoveredPings = activeNodes; + List responseList = activeNodes.stream() + .filter(p -> p != this) // remove this as pings are not expected to return the local node + .map(MockZenPing::getPingResponse) + .collect(Collectors.toList()); + listener.onPing(responseList); + } } private ClusterName getClusterName() { @@ -74,13 +94,17 @@ public final class MockZenPing extends AbstractComponent implements ZenPing { } private Set getActiveNodesForCurrentCluster() { + assert Thread.holdsLock(activeNodesPerCluster); return activeNodesPerCluster.computeIfAbsent(getClusterName(), clusterName -> ConcurrentCollections.newConcurrentSet()); } @Override public void close() { - boolean found = getActiveNodesForCurrentCluster().remove(this); - assert found; + synchronized (activeNodesPerCluster) { + boolean found = getActiveNodesForCurrentCluster().remove(this); + assert found; + activeNodesPerCluster.notifyAll(); + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 8cff517316b..bc560c9b0f0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -636,7 +636,7 @@ public class ElasticsearchAssertions { * a way that sucks less. */ NamedWriteableRegistry registry; - if (ESIntegTestCase.isInternalCluster()) { + if (ESIntegTestCase.isInternalCluster() && ESIntegTestCase.internalCluster().size() > 0) { registry = ESIntegTestCase.internalCluster().getInstance(NamedWriteableRegistry.class); } else { SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList()); diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java index 327a49d3678..36903c7c608 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java @@ -19,21 +19,6 @@ */ package org.elasticsearch.test.test; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; - import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Client; @@ -51,14 +36,32 @@ import org.elasticsearch.test.NodeConfigurationSource; import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.TransportSettings; +import org.hamcrest.Matcher; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.node.DiscoveryNode.Role.DATA; import static org.elasticsearch.cluster.node.DiscoveryNode.Role.INGEST; import static org.elasticsearch.cluster.node.DiscoveryNode.Role.MASTER; +import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileNotExists; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.not; /** @@ -81,10 +84,10 @@ public class InternalTestClusterTests extends ESTestCase { Path baseDir = createTempDir(); InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, - minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, + randomBoolean(), minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity()); InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, - minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, + randomBoolean(), minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity()); // TODO: this is not ideal - we should have a way to make sure ports are initialized in the same way assertClusters(cluster0, cluster1, false); @@ -116,7 +119,8 @@ public class InternalTestClusterTests extends ESTestCase { public static void assertSettings(Settings left, Settings right, boolean checkClusterUniqueSettings) { Set> entries0 = left.getAsMap().entrySet(); Map entries1 = right.getAsMap(); - assertThat(entries0.size(), equalTo(entries1.size())); + assertThat("--> left:\n" + left.toDelimitedString('\n') + "\n-->right:\n" + right.toDelimitedString('\n'), + entries0.size(), equalTo(entries1.size())); for (Map.Entry entry : entries0) { if (clusterUniqueSettings.contains(entry.getKey()) && checkClusterUniqueSettings == false) { continue; @@ -125,6 +129,41 @@ public class InternalTestClusterTests extends ESTestCase { } } + private void assertMMNinNodeSetting(InternalTestCluster cluster, int masterNodes) { + for (final String node : cluster.getNodeNames()) { + assertMMNinNodeSetting(node, cluster, masterNodes); + } + } + + private void assertMMNinNodeSetting(String node, InternalTestCluster cluster, int masterNodes) { + final int minMasterNodes = masterNodes / 2 + 1; + final Matcher> minMasterMatcher = + hasEntry(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)); + final Matcher> noMinMasterNodesMatcher = not(hasKey(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey())); + Settings nodeSettings = cluster.client(node).admin().cluster().prepareNodesInfo(node).get().getNodes().get(0).getSettings(); + assertThat("node setting of node [" + node + "] has the wrong min_master_node setting: [" + + nodeSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) + "]", + nodeSettings.getAsMap(), + cluster.getAutoManageMinMasterNode() ? minMasterMatcher: noMinMasterNodesMatcher); + } + + private void assertMMNinClusterSetting(InternalTestCluster cluster, int masterNodes) { + final int minMasterNodes = masterNodes / 2 + 1; + Matcher> minMasterMatcher = + hasEntry(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)); + Matcher> noMinMasterNodesMatcher = not(hasKey(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey())); + + for (final String node : cluster.getNodeNames()) { + Settings stateSettings = cluster.client(node).admin().cluster().prepareState().setLocal(true) + .get().getState().getMetaData().settings(); + + assertThat("dynamic setting for node [" + node + "] has the wrong min_master_node setting : [" + + stateSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) + "]", + stateSettings.getAsMap(), + cluster.getAutoManageMinMasterNode() ? minMasterMatcher: noMinMasterNodesMatcher); + } + } + public void testBeforeTest() throws Exception { long clusterSeed = randomLong(); boolean masterNodes = randomBoolean(); @@ -156,11 +195,12 @@ public class InternalTestClusterTests extends ESTestCase { Path baseDir = createTempDir(); final List> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class); + final boolean autoManageMinMasterNodes = randomBoolean(); InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, - minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, + autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, mockPlugins, Function.identity()); InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, - minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes, + autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, mockPlugins, Function.identity()); assertClusters(cluster0, cluster1, false); @@ -182,6 +222,8 @@ public class InternalTestClusterTests extends ESTestCase { assertSettings(client.settings(), other.settings(), false); } assertArrayEquals(cluster0.getNodeNames(), cluster1.getNodeNames()); + assertMMNinNodeSetting(cluster0, cluster0.numMasterNodes()); + assertMMNinNodeSetting(cluster1, cluster0.numMasterNodes()); cluster0.afterTest(); cluster1.afterTest(); } finally { @@ -216,12 +258,15 @@ public class InternalTestClusterTests extends ESTestCase { boolean enableHttpPipelining = randomBoolean(); String nodePrefix = "test"; Path baseDir = createTempDir(); + final boolean autoManageMinMasterNodes = randomBoolean(); InternalTestCluster cluster = new InternalTestCluster(clusterSeed, baseDir, masterNodes, - minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, + autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class), Function.identity()); try { cluster.beforeTest(random(), 0.0); + final int originalMasterCount = cluster.numMasterNodes(); + assertMMNinNodeSetting(cluster, originalMasterCount); final Map shardNodePaths = new HashMap<>(); for (String name: cluster.getNodeNames()) { shardNodePaths.put(name, getNodePaths(cluster, name)); @@ -230,7 +275,15 @@ public class InternalTestClusterTests extends ESTestCase { Path dataPath = getNodePaths(cluster, poorNode)[0]; final Path testMarker = dataPath.resolve("testMarker"); Files.createDirectories(testMarker); + int expectedMasterCount = originalMasterCount; + if (cluster.getInstance(ClusterService.class, poorNode).localNode().isMasterNode()) { + expectedMasterCount--; + } cluster.stopRandomNode(InternalTestCluster.nameFilter(poorNode)); + if (expectedMasterCount != originalMasterCount) { + // check for updated + assertMMNinClusterSetting(cluster, expectedMasterCount); + } assertFileExists(testMarker); // stopping a node half way shouldn't clean data final String stableNode = randomFrom(cluster.getNodeNames()); @@ -240,10 +293,17 @@ public class InternalTestClusterTests extends ESTestCase { Files.createDirectories(stableTestMarker); final String newNode1 = cluster.startNode(); + expectedMasterCount++; assertThat(getNodePaths(cluster, newNode1)[0], equalTo(dataPath)); assertFileExists(testMarker); // starting a node should re-use data folders and not clean it + if (expectedMasterCount > 1) { // this is the first master, it's in cluster state settings won't be updated + assertMMNinClusterSetting(cluster, expectedMasterCount); + } + assertMMNinNodeSetting(newNode1, cluster, expectedMasterCount); final String newNode2 = cluster.startNode(); + expectedMasterCount++; + assertMMNinClusterSetting(cluster, expectedMasterCount); final Path newDataPath = getNodePaths(cluster, newNode2)[0]; final Path newTestMarker = newDataPath.resolve("newTestMarker"); assertThat(newDataPath, not(dataPath)); @@ -262,6 +322,7 @@ public class InternalTestClusterTests extends ESTestCase { assertThat("data paths for " + name + " changed", getNodePaths(cluster, name), equalTo(shardNodePaths.get(name))); } + assertMMNinNodeSetting(cluster, originalMasterCount); } finally { cluster.close(); @@ -280,7 +341,7 @@ public class InternalTestClusterTests extends ESTestCase { public void testDifferentRolesMaintainPathOnRestart() throws Exception { final Path baseDir = createTempDir(); final int numNodes = 5; - InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, true, 0, 0, "test", + InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, true, true, 0, 0, "test", new NodeConfigurationSource() { @Override public Settings nodeSettings(int nodeOrdinal) { @@ -301,7 +362,9 @@ public class InternalTestClusterTests extends ESTestCase { try { Map> pathsPerRole = new HashMap<>(); for (int i = 0; i < numNodes; i++) { - final DiscoveryNode.Role role = randomFrom(MASTER, DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST); + final DiscoveryNode.Role role = i == numNodes -1 && pathsPerRole.containsKey(MASTER) == false ? + MASTER : // last noe and still no master ofr the cluster + randomFrom(MASTER, DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST); final String node; switch (role) { case MASTER: @@ -343,6 +406,59 @@ public class InternalTestClusterTests extends ESTestCase { } finally { cluster.close(); } + } + public void testTwoNodeCluster() throws Exception { + final boolean autoManageMinMasterNodes = randomBoolean(); + NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() { + @Override + public Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(NetworkModule.HTTP_ENABLED.getKey(), false) + .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2) + .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME) + .build(); + } + + @Override + public Settings transportClientSettings() { + return Settings.builder() + .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); + } + }; + boolean enableHttpPipelining = randomBoolean(); + String nodePrefix = "test"; + Path baseDir = createTempDir(); + InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, false, autoManageMinMasterNodes, 2, 2, + "test", nodeConfigurationSource, 0, enableHttpPipelining, nodePrefix, + Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class), Function.identity()); + try { + cluster.beforeTest(random(), 0.0); + assertMMNinNodeSetting(cluster, 2); + switch (randomInt(2)) { + case 0: + cluster.stopRandomDataNode(); + assertMMNinClusterSetting(cluster, 1); + cluster.startNode(); + assertMMNinClusterSetting(cluster, 2); + assertMMNinNodeSetting(cluster, 2); + break; + case 1: + cluster.rollingRestart(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertMMNinClusterSetting(cluster, 1); + return super.onNodeStopped(nodeName); + } + }); + assertMMNinClusterSetting(cluster, 2); + break; + case 2: + cluster.fullRestart(); + break; + } + assertMMNinNodeSetting(cluster, 2); + } finally { + cluster.close(); + } } }