From a2ceaa91cc8a3abc0732fab3aea6c1c82f01623b Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 20 Jul 2010 22:34:09 +0300 Subject: [PATCH] improve retry when closing the node performing an operation --- .../TransportMasterNodeOperationAction.java | 5 +- ...nsportShardReplicationOperationAction.java | 6 +- .../service/InternalClusterService.java | 4 + .../node/NodeCloseException.java | 40 ++ .../recovery/RecoveryWhileUnderLoadTests.java | 346 +++++++++++------- 5 files changed, 264 insertions(+), 137 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/node/NodeCloseException.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java index da8d2cf3766..abbb96b94f5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.TimeoutClusterStateListener; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.NodeCloseException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -78,7 +79,7 @@ public abstract class TransportMasterNodeOperationAction listener, final boolean retrying) { final ClusterState clusterState = clusterService.state(); - DiscoveryNodes nodes = clusterState.nodes(); + final DiscoveryNodes nodes = clusterState.nodes(); if (nodes.localNodeMaster()) { threadPool.execute(new Runnable() { @Override public void run() { @@ -123,7 +124,7 @@ public abstract class TransportMasterNodeOperationAction(timerTimeout, notifyTimeout)); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/node/NodeCloseException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/node/NodeCloseException.java new file mode 100644 index 00000000000..9c83fc3d21f --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/node/NodeCloseException.java @@ -0,0 +1,40 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.node; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.node.DiscoveryNode; + +/** + * @author kimchy (shay.banon) + */ +public class NodeCloseException extends ElasticSearchException { + + private final DiscoveryNode node; + + public NodeCloseException(DiscoveryNode node) { + super("node closed " + node); + this.node = node; + } + + public DiscoveryNode node() { + return node; + } +} diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/RecoveryWhileUnderLoadTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/RecoveryWhileUnderLoadTests.java index eb1ca91a707..d3b109fcf11 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/RecoveryWhileUnderLoadTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/recovery/RecoveryWhileUnderLoadTests.java @@ -21,6 +21,8 @@ package org.elasticsearch.test.integration.recovery; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.test.integration.AbstractNodesTests; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; @@ -38,147 +40,37 @@ import static org.hamcrest.Matchers.*; */ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests { + private final ESLogger logger = Loggers.getLogger(RecoveryWhileUnderLoadTests.class); + @AfterMethod public void shutdownNodes() { closeAllNodes(); } @Test public void recoverWhileUnderLoadAllocateBackupsTest() throws Exception { + logger.info("--> starting [node1] ..."); startNode("node1"); + logger.info("--> creating test index ..."); client("node1").admin().indices().prepareCreate("test").execute().actionGet(); final AtomicLong idGenerator = new AtomicLong(); final AtomicBoolean stop = new AtomicBoolean(false); Thread[] writers = new Thread[5]; final CountDownLatch stopLatch = new CountDownLatch(writers.length); + + logger.info("--> starting {} indexing threads", writers.length); for (int i = 0; i < writers.length; i++) { - writers[i] = new Thread() { - @Override public void run() { - while (!stop.get()) { - long id = idGenerator.incrementAndGet(); - client("node1").prepareIndex("test", "type1", Long.toString(id)) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - } - stopLatch.countDown(); - } - }; - writers[i].start(); - } - - while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 20000) { - Thread.sleep(100); - client("node1").admin().indices().prepareRefresh().execute().actionGet(); - } - - // now flush, just to make sure we have some data in the index, not just translog - client("node1").admin().indices().prepareFlush().execute().actionGet(); - - - while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 40000) { - Thread.sleep(100); - client("node1").admin().indices().prepareRefresh().execute().actionGet(); - } - - // now start another node, while we index - startNode("server2"); - - // make sure the cluster state is green, and all has been recovered - assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); - - - // wait till we index 10,0000 - while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 100000) { - Thread.sleep(100); - client("node1").admin().indices().prepareRefresh().execute().actionGet(); - } - - stop.set(true); - stopLatch.await(); - - client("node1").admin().indices().prepareRefresh().execute().actionGet(); - for (int i = 0; i < 10; i++) { - assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(idGenerator.get())); - } - } - - @Test public void recoverWhileUnderLoadAllocateBackupsRelocatePrimariesTest() throws Exception { - startNode("node1"); - - client("node1").admin().indices().prepareCreate("test").execute().actionGet(); - - final AtomicLong idGenerator = new AtomicLong(); - final AtomicBoolean stop = new AtomicBoolean(false); - Thread[] writers = new Thread[5]; - final CountDownLatch stopLatch = new CountDownLatch(writers.length); - for (int i = 0; i < writers.length; i++) { - writers[i] = new Thread() { - @Override public void run() { - while (!stop.get()) { - long id = idGenerator.incrementAndGet(); - client("node1").prepareIndex("test", "type1", Long.toString(id)) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - } - stopLatch.countDown(); - } - }; - writers[i].start(); - } - - while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 20000) { - Thread.sleep(100); - client("node1").admin().indices().prepareRefresh().execute().actionGet(); - } - - // now flush, just to make sure we have some data in the index, not just translog - client("node1").admin().indices().prepareFlush().execute().actionGet(); - - - while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 40000) { - Thread.sleep(100); - client("node1").admin().indices().prepareRefresh().execute().actionGet(); - } - - // now start another node, while we index - startNode("node2"); - startNode("node3"); - startNode("node4"); - - assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); - - - while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 150000) { - Thread.sleep(100); - client("node1").admin().indices().prepareRefresh().execute().actionGet(); - } - - stop.set(true); - stopLatch.await(); - - client("node1").admin().indices().prepareRefresh().execute().actionGet(); - for (int i = 0; i < 10; i++) { - assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(idGenerator.get())); - } - } - - @Test public void recoverWhileUnderLoadWithNodeShutdown() throws Exception { - startNode("node1"); - startNode("node2"); - - client("node1").admin().indices().prepareCreate("test").execute().actionGet(); - - final AtomicLong idGenerator = new AtomicLong(); - final AtomicBoolean stop = new AtomicBoolean(false); - Thread[] writers = new Thread[5]; - final CountDownLatch stopLatch = new CountDownLatch(writers.length); - for (int i = 0; i < writers.length; i++) { + final int indexerId = i; writers[i] = new Thread() { @Override public void run() { try { + logger.info("**** starting indexing thread {}", indexerId); while (!stop.get()) { long id = idGenerator.incrementAndGet(); - client("node2").prepareIndex("test", "type1", Long.toString(id)) + client("node1").prepareIndex("test", "type1", Long.toString(id)) .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); } + logger.info("**** done indexing thread {}", indexerId); } finally { stopLatch.countDown(); } @@ -187,44 +79,234 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests { writers[i].start(); } + logger.info("--> waiting for 20000 docs to be indexed ..."); while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 20000) { Thread.sleep(100); client("node1").admin().indices().prepareRefresh().execute().actionGet(); } + logger.info("--> 20000 docs indexed"); + logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog client("node1").admin().indices().prepareFlush().execute().actionGet(); + logger.info("--> waiting for 40000 docs to be indexed ..."); while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 40000) { Thread.sleep(100); client("node1").admin().indices().prepareRefresh().execute().actionGet(); } + logger.info("--> 40000 docs indexed"); - // now start nore nodes, while we index - startNode("node3"); - startNode("node4"); + logger.info("--> starting [node2] ..."); + // now start another node, while we index + startNode("node2"); + logger.info("--> waiting for GREEN health status ..."); + // make sure the cluster state is green, and all has been recovered assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); - - while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 80000) { + logger.info("--> waiting for 100000 docs to be indexed ..."); + while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 100000) { Thread.sleep(100); client("node1").admin().indices().prepareRefresh().execute().actionGet(); } + logger.info("--> 100000 docs indexed"); - // now, shutdown nodes - closeNode("node1"); - assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); - closeNode("node3"); - assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); - closeNode("node4"); - assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.YELLOW)); + logger.info("--> marking and waiting for indexing threads to stop ..."); + stop.set(true); + stopLatch.await(); + logger.info("--> indexing threads stopped"); + + logger.info("--> refreshing the index"); + client("node1").admin().indices().prepareRefresh().execute().actionGet(); + logger.info("--> verifying indexed content"); + for (int i = 0; i < 10; i++) { + assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(idGenerator.get())); + } + } + + @Test public void recoverWhileUnderLoadAllocateBackupsRelocatePrimariesTest() throws Exception { + logger.info("--> starting [node1] ..."); + startNode("node1"); + + logger.info("--> creating test index ..."); + client("node1").admin().indices().prepareCreate("test").execute().actionGet(); + + final AtomicLong idGenerator = new AtomicLong(); + final AtomicBoolean stop = new AtomicBoolean(false); + Thread[] writers = new Thread[5]; + logger.info("--> starting {} indexing threads", writers.length); + final CountDownLatch stopLatch = new CountDownLatch(writers.length); + for (int i = 0; i < writers.length; i++) { + final int indexerId = i; + writers[i] = new Thread() { + @Override public void run() { + try { + logger.info("**** starting indexing thread {}", indexerId); + while (!stop.get()) { + long id = idGenerator.incrementAndGet(); + client("node1").prepareIndex("test", "type1", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + } + logger.info("**** done indexing thread {}", indexerId); + } finally { + stopLatch.countDown(); + } + } + }; + writers[i].start(); + } + + logger.info("--> waiting for 20000 docs to be indexed ..."); + while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 20000) { + Thread.sleep(100); + client("node1").admin().indices().prepareRefresh().execute().actionGet(); + } + logger.info("--> 20000 docs indexed"); + + logger.info("--> flushing the index ...."); + // now flush, just to make sure we have some data in the index, not just translog + client("node1").admin().indices().prepareFlush().execute().actionGet(); + + + logger.info("--> waiting for 40000 docs to be indexed ..."); + while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 40000) { + Thread.sleep(100); + client("node1").admin().indices().prepareRefresh().execute().actionGet(); + } + logger.info("--> 40000 docs indexed"); + + logger.info("--> starting [node2] ..."); + startNode("node2"); + logger.info("--> starting [node3] ..."); + startNode("node3"); + logger.info("--> starting [node4] ..."); + startNode("node4"); + + logger.info("--> waiting for GREEN health status ..."); + assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + + + logger.info("--> waiting for 150000 docs to be indexed ..."); + while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 150000) { + Thread.sleep(100); + client("node1").admin().indices().prepareRefresh().execute().actionGet(); + } + logger.info("--> 150000 docs indexed"); stop.set(true); stopLatch.await(); + + logger.info("--> marking and waiting for indexing threads to stop ..."); + stop.set(true); + stopLatch.await(); + logger.info("--> indexing threads stopped"); + + logger.info("--> refreshing the index"); + client("node1").admin().indices().prepareRefresh().execute().actionGet(); + logger.info("--> verifying indexed content"); + for (int i = 0; i < 10; i++) { + assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(idGenerator.get())); + } + } + + @Test public void recoverWhileUnderLoadWithNodeShutdown() throws Exception { + logger.info("--> starting [node1] ..."); + startNode("node1"); + logger.info("--> starting [node2] ..."); + startNode("node2"); + + logger.info("--> creating test index ..."); + client("node1").admin().indices().prepareCreate("test").execute().actionGet(); + + final AtomicLong idGenerator = new AtomicLong(); + final AtomicBoolean stop = new AtomicBoolean(false); + Thread[] writers = new Thread[5]; + final CountDownLatch stopLatch = new CountDownLatch(writers.length); + logger.info("--> starting {} indexing threads", writers.length); + for (int i = 0; i < writers.length; i++) { + final int indexerId = i; + writers[i] = new Thread() { + @Override public void run() { + try { + logger.info("**** starting indexing thread {}", indexerId); + while (!stop.get()) { + long id = idGenerator.incrementAndGet(); + client("node2").prepareIndex("test", "type1", Long.toString(id)) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + } + logger.info("**** done indexing thread {}", indexerId); + } finally { + stopLatch.countDown(); + } + } + }; + writers[i].start(); + } + + logger.info("--> waiting for 20000 docs to be indexed ..."); + while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 20000) { + Thread.sleep(100); + client("node1").admin().indices().prepareRefresh().execute().actionGet(); + } + logger.info("--> 20000 docs indexed"); + + logger.info("--> flushing the index ...."); + // now flush, just to make sure we have some data in the index, not just translog + client("node1").admin().indices().prepareFlush().execute().actionGet(); + + + logger.info("--> waiting for 40000 docs to be indexed ..."); + while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 40000) { + Thread.sleep(100); + client("node1").admin().indices().prepareRefresh().execute().actionGet(); + } + logger.info("--> 40000 docs indexed"); + + // now start more nodes, while we index + logger.info("--> starting [node3] ..."); + startNode("node3"); + logger.info("--> starting [node4] ..."); + startNode("node4"); + + logger.info("--> waiting for GREEN health status ..."); + assertThat(client("node1").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + + + logger.info("--> waiting for 100000 docs to be indexed ..."); + while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 100000) { + Thread.sleep(100); + client("node1").admin().indices().prepareRefresh().execute().actionGet(); + } + logger.info("--> 100000 docs indexed"); + + // now, shutdown nodes + logger.info("--> shutting down [node1] ..."); + closeNode("node1"); + logger.info("--> waiting for GREEN health status ..."); + assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + + logger.info("--> shutting down [node3] ..."); + closeNode("node3"); + logger.info("--> waiting for GREEN health status ..."); + assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForGreenStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.GREEN)); + + logger.info("--> shutting down [node4] ..."); + closeNode("node4"); + logger.info("--> waiting for YELLOW health status ..."); assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.YELLOW)); + logger.info("--> marking and waiting for indexing threads to stop ..."); + stop.set(true); + stopLatch.await(); + logger.info("--> indexing threads stopped"); + + assertThat(client("node2").admin().cluster().prepareHealth().setTimeout("1m").setWaitForYellowStatus().execute().actionGet().status(), equalTo(ClusterHealthStatus.YELLOW)); + + logger.info("--> refreshing the index"); + client("node2").admin().indices().prepareRefresh().execute().actionGet(); + logger.info("--> verifying indexed content"); client("node2").admin().indices().prepareRefresh().execute().actionGet(); for (int i = 0; i < 10; i++) { assertThat(client("node2").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(idGenerator.get()));