diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index 872f6b6a55b..4d46b3ec1f1 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing; +import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.node.DiscoveryNode; import java.util.ArrayList; @@ -65,6 +66,11 @@ public class RoutingNode implements Iterable { } public void add(MutableShardRouting shard) { + for (MutableShardRouting shardRouting : shards) { + if (shardRouting.shardId().equals(shard.shardId())) { + throw new ElasticSearchIllegalStateException("Trying to add a shard [" + shard.shardId().index().name() + "][" + shard.shardId().id() + "] to a node [" + nodeId + "] where it already exists"); + } + } shards.add(shard); shard.assignToNode(node.id()); } diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 86ada7fb186..8f64ee4ef22 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.routing.allocation; import com.google.common.collect.Lists; +import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.*; @@ -229,6 +230,20 @@ public class AllocationService extends AbstractComponent { changed = true; shardEntry.moveFromPrimary(); shardEntry2.moveToPrimary(); + + if (shardEntry2.relocatingNodeId() != null) { + // its also relocating, make sure to move the other routing to primary + RoutingNode node = routingNodes.node(shardEntry2.relocatingNodeId()); + if (node != null) { + for (MutableShardRouting shardRouting : node) { + if (shardRouting.shardId().equals(shardEntry2.shardId()) && !shardRouting.primary()) { + shardRouting.moveToPrimary(); + break; + } + } + } + } + elected = true; break; } @@ -268,48 +283,82 @@ public class AllocationService extends AbstractComponent { for (RoutingNode routingNode : routingNodes) { for (Iterator shardsIterator = routingNode.shards().iterator(); shardsIterator.hasNext(); ) { MutableShardRouting shardRoutingEntry = shardsIterator.next(); - if (shardRoutingEntry.assignedToNode()) { - // we store the relocation state here since when we call de-assign node - // later on, we will loose this state - boolean relocating = shardRoutingEntry.relocating(); - String relocatingNodeId = shardRoutingEntry.relocatingNodeId(); - // is this the destination shard that we are relocating an existing shard to? - // we know this since it has a relocating node id (the node we relocate from) and our state is INITIALIZING (and not RELOCATING) - boolean isRelocationDestinationShard = relocatingNodeId != null && shardRoutingEntry.initializing(); + if (!shardRoutingEntry.assignedToNode()) { + throw new ElasticSearchIllegalStateException(shardRoutingEntry.shardId() + " is not assigned to a node, but listed on as existing on node [" + routingNode.nodeId() + "]"); + } + // we store the relocation state here since when we call de-assign node + // later on, we will loose this state + boolean relocating = shardRoutingEntry.relocating(); + String relocatingNodeId = shardRoutingEntry.relocatingNodeId(); + // is this the destination shard that we are relocating an existing shard to? + // we know this since it has a relocating node id (the node we relocate from) and our state is INITIALIZING (and not RELOCATING) + boolean isRelocationDestinationShard = relocatingNodeId != null && shardRoutingEntry.initializing(); - boolean currentNodeIsDead = false; - if (!liveNodeIds.contains(shardRoutingEntry.currentNodeId())) { + boolean currentNodeIsDead = false; + if (!liveNodeIds.contains(shardRoutingEntry.currentNodeId())) { + changed = true; + nodeIdsToRemove.add(shardRoutingEntry.currentNodeId()); + + if (!isRelocationDestinationShard) { + routingNodes.unassigned().add(shardRoutingEntry); + } + + shardRoutingEntry.deassignNode(); + currentNodeIsDead = true; + shardsIterator.remove(); + } + + // move source shard back to active state and cancel relocation mode. + if (relocating && !liveNodeIds.contains(relocatingNodeId)) { + nodeIdsToRemove.add(relocatingNodeId); + if (!currentNodeIsDead) { changed = true; - nodeIdsToRemove.add(shardRoutingEntry.currentNodeId()); - - if (!isRelocationDestinationShard) { - routingNodes.unassigned().add(shardRoutingEntry); - } - - shardRoutingEntry.deassignNode(); - currentNodeIsDead = true; - shardsIterator.remove(); + shardRoutingEntry.cancelRelocation(); } + } - // move source shard back to active state and cancel relocation mode. - if (relocating && !liveNodeIds.contains(relocatingNodeId)) { - nodeIdsToRemove.add(relocatingNodeId); - if (!currentNodeIsDead) { - changed = true; - shardRoutingEntry.cancelRelocation(); - } - } - - if (isRelocationDestinationShard && !liveNodeIds.contains(relocatingNodeId)) { - changed = true; - shardsIterator.remove(); - } + if (isRelocationDestinationShard && !liveNodeIds.contains(relocatingNodeId)) { + changed = true; + shardsIterator.remove(); } } } for (String nodeIdToRemove : nodeIdsToRemove) { routingNodes.nodesToShards().remove(nodeIdToRemove); } + + // now, go over shards that are initializing and recovering from primary shards that are now down... + for (RoutingNode routingNode : routingNodes) { + for (Iterator shardsIterator = routingNode.shards().iterator(); shardsIterator.hasNext(); ) { + MutableShardRouting shardRoutingEntry = shardsIterator.next(); + if (!shardRoutingEntry.assignedToNode()) { + throw new ElasticSearchIllegalStateException(shardRoutingEntry.shardId() + " is not assigned to a node, but listed on as existing on node [" + routingNode.nodeId() + "]"); + } + // we always recover from primaries, so we care about replicas that are not primaries + if (shardRoutingEntry.primary()) { + continue; + } + // if its not initializing, then its not recovering from the primary + if (!shardRoutingEntry.initializing()) { + continue; + } + // its initializing because its relocating from another node (its replica recovering from another replica) + if (shardRoutingEntry.relocatingNodeId() != null) { + continue; + } + for (MutableShardRouting unassignedShardRouting : routingNodes.unassigned()) { + // double check on the unassignedShardRouting.primary(), but it has to be a primary... (well, we double checked actually before...) + if (unassignedShardRouting.shardId().equals(shardRoutingEntry.shardId()) && unassignedShardRouting.primary()) { + // remove it... + routingNodes.unassigned().add(shardRoutingEntry); + shardRoutingEntry.deassignNode(); + shardsIterator.remove(); + break; + } + } + } + } + return changed; } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 79329d55af8..dc501f465b7 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -36,6 +36,10 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.internal.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; @@ -51,8 +55,9 @@ import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.*; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; @@ -148,6 +153,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener()); this.pingService.setNodesProvider(this); this.membership = new MembershipAction(settings, transportService, this, new MembershipListener()); + + transportService.registerHandler(RejoinClusterRequestHandler.ACTION, new RejoinClusterRequestHandler()); } @Override @@ -362,7 +369,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen currentState = newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build(); // check if we have enough master nodes, if not, we need to move into joining the cluster again if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) { - return disconnectFromCluster(currentState, "not enough master nodes"); + return rejoin(currentState, "not enough master nodes"); } return currentState; } @@ -391,7 +398,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen currentState = newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build(); // check if we have enough master nodes, if not, we need to move into joining the cluster again if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) { - return disconnectFromCluster(currentState, "not enough master nodes"); + return rejoin(currentState, "not enough master nodes"); } return currentState; } @@ -430,7 +437,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen .masterNodeId(null); if (!electMaster.hasEnoughMasterNodes(nodesBuilder.build())) { - return disconnectFromCluster(ClusterState.builder().state(currentState).nodes(nodesBuilder).build(), "not enough master nodes after master left (reason = " + reason + ")"); + return rejoin(ClusterState.builder().state(currentState).nodes(nodesBuilder).build(), "not enough master nodes after master left (reason = " + reason + ")"); } final DiscoveryNode electedMaster = electMaster.electMaster(nodesBuilder.build()); // elect master @@ -451,7 +458,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen .nodes(latestDiscoNodes) .build(); } else { - return disconnectFromCluster(newClusterStateBuilder().state(currentState).nodes(nodesBuilder.build()).build(), "master_left and no other node elected to become master"); + return rejoin(newClusterStateBuilder().state(currentState).nodes(nodesBuilder.build()).build(), "master_left and no other node elected to become master"); } } } @@ -466,7 +473,24 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen void handleNewClusterStateFromMaster(final ClusterState newState) { if (master) { - logger.warn("master should not receive new cluster state from [{}]", newState.nodes().masterNode()); + clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + if (newState.version() > currentState.version()) { + logger.warn("received cluster state from [{}] which is also master but with a newer cluster_state, rejoining to cluster...", newState.nodes().masterNode()); + return rejoin(currentState, "zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]"); + } else { + logger.warn("received cluster state from [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster", newState.nodes().masterNode(), newState.nodes().masterNode()); + transportService.sendRequest(newState.nodes().masterNode(), RejoinClusterRequestHandler.ACTION, new RejoinClusterRequest(currentState.nodes().localNodeId()), new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleException(TransportException exp) { + logger.warn("failed to send rejoin request to [{}]", exp, newState.nodes().masterNode()); + } + }); + return currentState; + } + } + }); } else { if (newState.nodes().localNode() == null) { logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newState.nodes().masterNode()); @@ -634,7 +658,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return null; } - private ClusterState disconnectFromCluster(ClusterState clusterState, String reason) { + private ClusterState rejoin(ClusterState clusterState, String reason) { logger.warn(reason + ", current nodes: {}", clusterState.nodes()); nodesFD.stop(); masterFD.stop(reason); @@ -716,4 +740,57 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } } } + + static class RejoinClusterRequest implements Streamable { + + private String fromNodeId; + + RejoinClusterRequest(String fromNodeId) { + this.fromNodeId = fromNodeId; + } + + RejoinClusterRequest() { + } + + @Override + public void readFrom(StreamInput in) throws IOException { + fromNodeId = in.readOptionalUTF(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalUTF(fromNodeId); + } + } + + class RejoinClusterRequestHandler extends BaseTransportRequestHandler { + + static final String ACTION = "discovery/zen/rejoin"; + + @Override + public RejoinClusterRequest newInstance() { + return new RejoinClusterRequest(); + } + + @Override + public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception { + clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + try { + channel.sendResponse(VoidStreamable.INSTANCE); + } catch (Exception e) { + logger.warn("failed to send response on rejoin cluster request handling", e); + } + return rejoin(currentState, "received a request to rejoin the cluster from [" + request.fromNodeId + "]"); + } + }); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + } diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index e38319fa9c0..88530c55725 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -305,8 +305,17 @@ public class MasterFaultDetection extends AbstractComponent { // check if the master node did not get switched on us... if (masterToPing.equals(MasterFaultDetection.this.masterNode())) { if (exp.getCause() instanceof NoLongerMasterException) { - logger.debug("[master] pinging a master {} that is no longer a master", masterNode, pingRetryCount, pingRetryTimeout); + logger.debug("[master] pinging a master {} that is no longer a master", masterNode); notifyMasterFailure(masterToPing, "no longer master"); + return; + } else if (exp.getCause() instanceof NotMasterException) { + logger.debug("[master] pinging a master {} that is not the master", masterNode); + notifyMasterFailure(masterToPing, "no longer master"); + return; + } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) { + logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode); + notifyMasterFailure(masterToPing, "do not exists on master, act as master failure"); + return; } int retryCount = ++MasterFaultDetection.this.retryCount; logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount); @@ -330,7 +339,21 @@ public class MasterFaultDetection extends AbstractComponent { } } - private static class NoLongerMasterException extends ElasticSearchIllegalStateException { + static class NoLongerMasterException extends ElasticSearchIllegalStateException { + @Override + public Throwable fillInStackTrace() { + return null; + } + } + + static class NotMasterException extends ElasticSearchIllegalStateException { + @Override + public Throwable fillInStackTrace() { + return null; + } + } + + static class NodeDoesNotExistOnMasterException extends ElasticSearchIllegalStateException { @Override public Throwable fillInStackTrace() { return null; @@ -352,12 +375,15 @@ public class MasterFaultDetection extends AbstractComponent { // check if we are really the same master as the one we seemed to be think we are // this can happen if the master got "kill -9" and then another node started using the same port if (!request.masterNodeId.equals(nodes.localNodeId())) { - throw new ElasticSearchIllegalStateException("Got ping as master with id [" + request.masterNodeId + "], but not master and no id"); + throw new NotMasterException(); } // if we are no longer master, fail... if (!nodes.localNodeMaster()) { throw new NoLongerMasterException(); } + if (!nodes.nodeExists(request.nodeId)) { + throw new NodeDoesNotExistOnMasterException(); + } // send a response, and note if we are connected to the master or not channel.sendResponse(new MasterPingResponseResponse(nodes.nodeExists(request.nodeId))); } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index ab3273a471f..acfe3db079e 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -279,7 +279,7 @@ public class RecoveryTarget extends AbstractComponent { synchronized (entry.getValue()) { try { entry.getValue().close(); - } catch (IOException e) { + } catch (Exception e) { // ignore } } @@ -431,11 +431,11 @@ public class RecoveryTarget extends AbstractComponent { // first, we go and move files that were created with the recovery id suffix to // the actual names, its ok if we have a corrupted index here, since we have replicas // to recover from in case of a full cluster shutdown just when this code executes... - String suffix = "." + onGoingRecovery.startTime; + String prefix = "recovery." + onGoingRecovery.startTime + "."; Set filesToRename = Sets.newHashSet(); for (String existingFile : shard.store().directory().listAll()) { - if (existingFile.endsWith(suffix)) { - filesToRename.add(existingFile.substring(0, existingFile.length() - suffix.length())); + if (existingFile.startsWith(prefix)) { + filesToRename.add(existingFile.substring(prefix.length(), existingFile.length())); } } Exception failureToRename = null; @@ -447,7 +447,7 @@ public class RecoveryTarget extends AbstractComponent { for (String fileToRename : filesToRename) { // now, rename the files... try { - shard.store().renameFile(fileToRename + suffix, fileToRename); + shard.store().renameFile(prefix + fileToRename, fileToRename); } catch (Exception e) { failureToRename = e; break; @@ -517,7 +517,7 @@ public class RecoveryTarget extends AbstractComponent { String name = request.name(); if (shard.store().directory().fileExists(name)) { - name = name + "." + onGoingRecovery.startTime; + name = "recovery." + onGoingRecovery.startTime + "." + name; } indexOutput = shard.store().createOutputRaw(name); diff --git a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index acc24a86513..622c9247175 100644 --- a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport.netty; import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.ThrowableObjectInputStream; @@ -388,11 +389,14 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { try { handler.messageReceived(streamable, transportChannel); } catch (Throwable e) { - try { - transportChannel.sendResponse(e); - } catch (IOException e1) { - logger.warn("Failed to send error message back to client for action [" + action + "]", e1); - logger.warn("Actual Exception", e); + if (transport.lifecycleState() == Lifecycle.State.STARTED) { + // we can only send a response transport is started.... + try { + transportChannel.sendResponse(e); + } catch (IOException e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e1); + logger.warn("Actual Exception", e); + } } } } diff --git a/src/test/java/org/elasticsearch/test/stress/rollingrestart/QuickRollingRestartStressTest.java b/src/test/java/org/elasticsearch/test/stress/rollingrestart/QuickRollingRestartStressTest.java new file mode 100644 index 00000000000..51f116af53a --- /dev/null +++ b/src/test/java/org/elasticsearch/test/stress/rollingrestart/QuickRollingRestartStressTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.stress.rollingrestart; + +import jsr166y.ThreadLocalRandom; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.RandomStringGenerator; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.SizeValue; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; + +import java.util.Date; + +/** + */ +public class QuickRollingRestartStressTest { + + public static void main(String[] args) throws Exception { + System.setProperty("es.logger.prefix", ""); + + Settings settings = ImmutableSettings.settingsBuilder().build(); + + Node[] nodes = new Node[5]; + for (int i = 0; i < nodes.length; i++) { + nodes[i] = NodeBuilder.nodeBuilder().settings(settings).node(); + } + + Node client = NodeBuilder.nodeBuilder().client(true).node(); + + long COUNT; + if (client.client().admin().indices().prepareExists("test").execute().actionGet().exists()) { + ClusterHealthResponse clusterHealthResponse = client.client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet(); + if (clusterHealthResponse.timedOut()) { + throw new ElasticSearchException("failed to wait for green state on startup..."); + } + COUNT = client.client().prepareCount().execute().actionGet().count(); + System.out.println("--> existing index, count [" + COUNT + "]"); + } else { + COUNT = SizeValue.parseSizeValue("100k").singles(); + System.out.println("--> indexing data..."); + for (long i = 0; i < COUNT; i++) { + client.client().prepareIndex("test", "type", Long.toString(i)) + .setSource("date", new Date(), "data", RandomStringGenerator.randomAlphabetic(10000)) + .execute().actionGet(); + } + System.out.println("--> done indexing data [" + COUNT + "]"); + client.client().admin().indices().prepareRefresh().execute().actionGet(); + for (int i = 0; i < 10; i++) { + long count = client.client().prepareCount().execute().actionGet().count(); + if (COUNT != count) { + System.err.println("--> the indexed docs do not match the count..., got [" + count + "], expected [" + COUNT + "]"); + } + } + } + + final int ROLLING_RESTARTS = 100; + System.out.println("--> starting rolling restarts [" + ROLLING_RESTARTS + "]"); + for (int rollingRestart = 0; rollingRestart < ROLLING_RESTARTS; rollingRestart++) { + System.out.println("--> doing rolling restart [" + rollingRestart + "]..."); + int nodeId = ThreadLocalRandom.current().nextInt(); + for (int i = 0; i < nodes.length; i++) { + int nodeIdx = Math.abs(nodeId++) % nodes.length; + nodes[nodeIdx].close(); + nodes[nodeIdx] = NodeBuilder.nodeBuilder().settings(settings).node(); + } + System.out.println("--> done rolling restart [" + rollingRestart + "]"); + + System.out.println("--> waiting for green state now..."); + ClusterHealthResponse clusterHealthResponse = client.client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet(); + if (clusterHealthResponse.timedOut()) { + System.err.println("--> timed out waiting for green state..."); + ClusterState state = client.client().admin().cluster().prepareState().execute().actionGet().state(); + System.out.println(state.routingTable().prettyPrint()); + throw new ElasticSearchException("timed out waiting for green state"); + } else { + System.out.println("--> got green status"); + } + + System.out.println("--> checking data [" + rollingRestart + "]...."); + boolean failed = false; + for (int i = 0; i < 10; i++) { + long count = client.client().prepareCount().execute().actionGet().count(); + if (COUNT != count) { + failed = true; + System.err.println("--> ERROR the indexed docs do not match the count..., got [" + count + "], expected [" + COUNT + "]"); + } + } + if (!failed) { + System.out.println("--> count verified"); + } + } + + System.out.println("--> shutting down..."); + client.close(); + for (Node node : nodes) { + node.close(); + } + } +}