Improve cluster resiliency to disconnected sub clusters + fix a shard allocation bug with quick rolling restarts

Two main changes:

Improve cluster resiliency to disconnected sub clusters. If a node pings a master and that node is no longer registered with the master, improve the rejoin process of that node to the cluster. Also, if a master receives a message from another master, pick one to force to rejoin the cluster (based on cluster state versioning).
On quick rolling restart, without waiting for shard allocation, the shard allocation logic can mess up its counts, causing for strange logic in allocating shards, or validation failures on routing table allocation.
This commit is contained in:
Shay Banon 2012-06-22 03:36:54 +02:00
parent 90371beedc
commit cc3fab45ff
7 changed files with 335 additions and 53 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import java.util.ArrayList;
@ -65,6 +66,11 @@ public class RoutingNode implements Iterable<MutableShardRouting> {
}
public void add(MutableShardRouting shard) {
for (MutableShardRouting shardRouting : shards) {
if (shardRouting.shardId().equals(shard.shardId())) {
throw new ElasticSearchIllegalStateException("Trying to add a shard [" + shard.shardId().index().name() + "][" + shard.shardId().id() + "] to a node [" + nodeId + "] where it already exists");
}
}
shards.add(shard);
shard.assignToNode(node.id());
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing.allocation;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
@ -229,6 +230,20 @@ public class AllocationService extends AbstractComponent {
changed = true;
shardEntry.moveFromPrimary();
shardEntry2.moveToPrimary();
if (shardEntry2.relocatingNodeId() != null) {
// its also relocating, make sure to move the other routing to primary
RoutingNode node = routingNodes.node(shardEntry2.relocatingNodeId());
if (node != null) {
for (MutableShardRouting shardRouting : node) {
if (shardRouting.shardId().equals(shardEntry2.shardId()) && !shardRouting.primary()) {
shardRouting.moveToPrimary();
break;
}
}
}
}
elected = true;
break;
}
@ -268,48 +283,82 @@ public class AllocationService extends AbstractComponent {
for (RoutingNode routingNode : routingNodes) {
for (Iterator<MutableShardRouting> shardsIterator = routingNode.shards().iterator(); shardsIterator.hasNext(); ) {
MutableShardRouting shardRoutingEntry = shardsIterator.next();
if (shardRoutingEntry.assignedToNode()) {
// we store the relocation state here since when we call de-assign node
// later on, we will loose this state
boolean relocating = shardRoutingEntry.relocating();
String relocatingNodeId = shardRoutingEntry.relocatingNodeId();
// is this the destination shard that we are relocating an existing shard to?
// we know this since it has a relocating node id (the node we relocate from) and our state is INITIALIZING (and not RELOCATING)
boolean isRelocationDestinationShard = relocatingNodeId != null && shardRoutingEntry.initializing();
if (!shardRoutingEntry.assignedToNode()) {
throw new ElasticSearchIllegalStateException(shardRoutingEntry.shardId() + " is not assigned to a node, but listed on as existing on node [" + routingNode.nodeId() + "]");
}
// we store the relocation state here since when we call de-assign node
// later on, we will loose this state
boolean relocating = shardRoutingEntry.relocating();
String relocatingNodeId = shardRoutingEntry.relocatingNodeId();
// is this the destination shard that we are relocating an existing shard to?
// we know this since it has a relocating node id (the node we relocate from) and our state is INITIALIZING (and not RELOCATING)
boolean isRelocationDestinationShard = relocatingNodeId != null && shardRoutingEntry.initializing();
boolean currentNodeIsDead = false;
if (!liveNodeIds.contains(shardRoutingEntry.currentNodeId())) {
boolean currentNodeIsDead = false;
if (!liveNodeIds.contains(shardRoutingEntry.currentNodeId())) {
changed = true;
nodeIdsToRemove.add(shardRoutingEntry.currentNodeId());
if (!isRelocationDestinationShard) {
routingNodes.unassigned().add(shardRoutingEntry);
}
shardRoutingEntry.deassignNode();
currentNodeIsDead = true;
shardsIterator.remove();
}
// move source shard back to active state and cancel relocation mode.
if (relocating && !liveNodeIds.contains(relocatingNodeId)) {
nodeIdsToRemove.add(relocatingNodeId);
if (!currentNodeIsDead) {
changed = true;
nodeIdsToRemove.add(shardRoutingEntry.currentNodeId());
if (!isRelocationDestinationShard) {
routingNodes.unassigned().add(shardRoutingEntry);
}
shardRoutingEntry.deassignNode();
currentNodeIsDead = true;
shardsIterator.remove();
shardRoutingEntry.cancelRelocation();
}
}
// move source shard back to active state and cancel relocation mode.
if (relocating && !liveNodeIds.contains(relocatingNodeId)) {
nodeIdsToRemove.add(relocatingNodeId);
if (!currentNodeIsDead) {
changed = true;
shardRoutingEntry.cancelRelocation();
}
}
if (isRelocationDestinationShard && !liveNodeIds.contains(relocatingNodeId)) {
changed = true;
shardsIterator.remove();
}
if (isRelocationDestinationShard && !liveNodeIds.contains(relocatingNodeId)) {
changed = true;
shardsIterator.remove();
}
}
}
for (String nodeIdToRemove : nodeIdsToRemove) {
routingNodes.nodesToShards().remove(nodeIdToRemove);
}
// now, go over shards that are initializing and recovering from primary shards that are now down...
for (RoutingNode routingNode : routingNodes) {
for (Iterator<MutableShardRouting> shardsIterator = routingNode.shards().iterator(); shardsIterator.hasNext(); ) {
MutableShardRouting shardRoutingEntry = shardsIterator.next();
if (!shardRoutingEntry.assignedToNode()) {
throw new ElasticSearchIllegalStateException(shardRoutingEntry.shardId() + " is not assigned to a node, but listed on as existing on node [" + routingNode.nodeId() + "]");
}
// we always recover from primaries, so we care about replicas that are not primaries
if (shardRoutingEntry.primary()) {
continue;
}
// if its not initializing, then its not recovering from the primary
if (!shardRoutingEntry.initializing()) {
continue;
}
// its initializing because its relocating from another node (its replica recovering from another replica)
if (shardRoutingEntry.relocatingNodeId() != null) {
continue;
}
for (MutableShardRouting unassignedShardRouting : routingNodes.unassigned()) {
// double check on the unassignedShardRouting.primary(), but it has to be a primary... (well, we double checked actually before...)
if (unassignedShardRouting.shardId().equals(shardRoutingEntry.shardId()) && unassignedShardRouting.primary()) {
// remove it...
routingNodes.unassigned().add(shardRoutingEntry);
shardRoutingEntry.deassignNode();
shardsIterator.remove();
break;
}
}
}
}
return changed;
}

View File

@ -36,6 +36,10 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
@ -51,8 +55,9 @@ import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -148,6 +153,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener());
this.pingService.setNodesProvider(this);
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
transportService.registerHandler(RejoinClusterRequestHandler.ACTION, new RejoinClusterRequestHandler());
}
@Override
@ -362,7 +369,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
currentState = newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
// check if we have enough master nodes, if not, we need to move into joining the cluster again
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
return disconnectFromCluster(currentState, "not enough master nodes");
return rejoin(currentState, "not enough master nodes");
}
return currentState;
}
@ -391,7 +398,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
currentState = newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
// check if we have enough master nodes, if not, we need to move into joining the cluster again
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
return disconnectFromCluster(currentState, "not enough master nodes");
return rejoin(currentState, "not enough master nodes");
}
return currentState;
}
@ -430,7 +437,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
.masterNodeId(null);
if (!electMaster.hasEnoughMasterNodes(nodesBuilder.build())) {
return disconnectFromCluster(ClusterState.builder().state(currentState).nodes(nodesBuilder).build(), "not enough master nodes after master left (reason = " + reason + ")");
return rejoin(ClusterState.builder().state(currentState).nodes(nodesBuilder).build(), "not enough master nodes after master left (reason = " + reason + ")");
}
final DiscoveryNode electedMaster = electMaster.electMaster(nodesBuilder.build()); // elect master
@ -451,7 +458,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
.nodes(latestDiscoNodes)
.build();
} else {
return disconnectFromCluster(newClusterStateBuilder().state(currentState).nodes(nodesBuilder.build()).build(), "master_left and no other node elected to become master");
return rejoin(newClusterStateBuilder().state(currentState).nodes(nodesBuilder.build()).build(), "master_left and no other node elected to become master");
}
}
}
@ -466,7 +473,24 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
void handleNewClusterStateFromMaster(final ClusterState newState) {
if (master) {
logger.warn("master should not receive new cluster state from [{}]", newState.nodes().masterNode());
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (newState.version() > currentState.version()) {
logger.warn("received cluster state from [{}] which is also master but with a newer cluster_state, rejoining to cluster...", newState.nodes().masterNode());
return rejoin(currentState, "zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]");
} else {
logger.warn("received cluster state from [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster", newState.nodes().masterNode(), newState.nodes().masterNode());
transportService.sendRequest(newState.nodes().masterNode(), RejoinClusterRequestHandler.ACTION, new RejoinClusterRequest(currentState.nodes().localNodeId()), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send rejoin request to [{}]", exp, newState.nodes().masterNode());
}
});
return currentState;
}
}
});
} else {
if (newState.nodes().localNode() == null) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newState.nodes().masterNode());
@ -634,7 +658,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return null;
}
private ClusterState disconnectFromCluster(ClusterState clusterState, String reason) {
private ClusterState rejoin(ClusterState clusterState, String reason) {
logger.warn(reason + ", current nodes: {}", clusterState.nodes());
nodesFD.stop();
masterFD.stop(reason);
@ -716,4 +740,57 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
}
}
static class RejoinClusterRequest implements Streamable {
private String fromNodeId;
RejoinClusterRequest(String fromNodeId) {
this.fromNodeId = fromNodeId;
}
RejoinClusterRequest() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
fromNodeId = in.readOptionalUTF();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalUTF(fromNodeId);
}
}
class RejoinClusterRequestHandler extends BaseTransportRequestHandler<RejoinClusterRequest> {
static final String ACTION = "discovery/zen/rejoin";
@Override
public RejoinClusterRequest newInstance() {
return new RejoinClusterRequest();
}
@Override
public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception {
clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
channel.sendResponse(VoidStreamable.INSTANCE);
} catch (Exception e) {
logger.warn("failed to send response on rejoin cluster request handling", e);
}
return rejoin(currentState, "received a request to rejoin the cluster from [" + request.fromNodeId + "]");
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
}

View File

@ -305,8 +305,17 @@ public class MasterFaultDetection extends AbstractComponent {
// check if the master node did not get switched on us...
if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
if (exp.getCause() instanceof NoLongerMasterException) {
logger.debug("[master] pinging a master {} that is no longer a master", masterNode, pingRetryCount, pingRetryTimeout);
logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
notifyMasterFailure(masterToPing, "no longer master");
return;
} else if (exp.getCause() instanceof NotMasterException) {
logger.debug("[master] pinging a master {} that is not the master", masterNode);
notifyMasterFailure(masterToPing, "no longer master");
return;
} else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode);
notifyMasterFailure(masterToPing, "do not exists on master, act as master failure");
return;
}
int retryCount = ++MasterFaultDetection.this.retryCount;
logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount);
@ -330,7 +339,21 @@ public class MasterFaultDetection extends AbstractComponent {
}
}
private static class NoLongerMasterException extends ElasticSearchIllegalStateException {
static class NoLongerMasterException extends ElasticSearchIllegalStateException {
@Override
public Throwable fillInStackTrace() {
return null;
}
}
static class NotMasterException extends ElasticSearchIllegalStateException {
@Override
public Throwable fillInStackTrace() {
return null;
}
}
static class NodeDoesNotExistOnMasterException extends ElasticSearchIllegalStateException {
@Override
public Throwable fillInStackTrace() {
return null;
@ -352,12 +375,15 @@ public class MasterFaultDetection extends AbstractComponent {
// check if we are really the same master as the one we seemed to be think we are
// this can happen if the master got "kill -9" and then another node started using the same port
if (!request.masterNodeId.equals(nodes.localNodeId())) {
throw new ElasticSearchIllegalStateException("Got ping as master with id [" + request.masterNodeId + "], but not master and no id");
throw new NotMasterException();
}
// if we are no longer master, fail...
if (!nodes.localNodeMaster()) {
throw new NoLongerMasterException();
}
if (!nodes.nodeExists(request.nodeId)) {
throw new NodeDoesNotExistOnMasterException();
}
// send a response, and note if we are connected to the master or not
channel.sendResponse(new MasterPingResponseResponse(nodes.nodeExists(request.nodeId)));
}

View File

@ -279,7 +279,7 @@ public class RecoveryTarget extends AbstractComponent {
synchronized (entry.getValue()) {
try {
entry.getValue().close();
} catch (IOException e) {
} catch (Exception e) {
// ignore
}
}
@ -431,11 +431,11 @@ public class RecoveryTarget extends AbstractComponent {
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
String suffix = "." + onGoingRecovery.startTime;
String prefix = "recovery." + onGoingRecovery.startTime + ".";
Set<String> filesToRename = Sets.newHashSet();
for (String existingFile : shard.store().directory().listAll()) {
if (existingFile.endsWith(suffix)) {
filesToRename.add(existingFile.substring(0, existingFile.length() - suffix.length()));
if (existingFile.startsWith(prefix)) {
filesToRename.add(existingFile.substring(prefix.length(), existingFile.length()));
}
}
Exception failureToRename = null;
@ -447,7 +447,7 @@ public class RecoveryTarget extends AbstractComponent {
for (String fileToRename : filesToRename) {
// now, rename the files...
try {
shard.store().renameFile(fileToRename + suffix, fileToRename);
shard.store().renameFile(prefix + fileToRename, fileToRename);
} catch (Exception e) {
failureToRename = e;
break;
@ -517,7 +517,7 @@ public class RecoveryTarget extends AbstractComponent {
String name = request.name();
if (shard.store().directory().fileExists(name)) {
name = name + "." + onGoingRecovery.startTime;
name = "recovery." + onGoingRecovery.startTime + "." + name;
}
indexOutput = shard.store().createOutputRaw(name);

View File

@ -20,6 +20,7 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.ThrowableObjectInputStream;
@ -388,11 +389,14 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
try {
handler.messageReceived(streamable, transportChannel);
} catch (Throwable e) {
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
logger.warn("Actual Exception", e);
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
// we can only send a response transport is started....
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
logger.warn("Actual Exception", e);
}
}
}
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.stress.rollingrestart;
import jsr166y.ThreadLocalRandom;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.RandomStringGenerator;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import java.util.Date;
/**
*/
public class QuickRollingRestartStressTest {
public static void main(String[] args) throws Exception {
System.setProperty("es.logger.prefix", "");
Settings settings = ImmutableSettings.settingsBuilder().build();
Node[] nodes = new Node[5];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = NodeBuilder.nodeBuilder().settings(settings).node();
}
Node client = NodeBuilder.nodeBuilder().client(true).node();
long COUNT;
if (client.client().admin().indices().prepareExists("test").execute().actionGet().exists()) {
ClusterHealthResponse clusterHealthResponse = client.client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet();
if (clusterHealthResponse.timedOut()) {
throw new ElasticSearchException("failed to wait for green state on startup...");
}
COUNT = client.client().prepareCount().execute().actionGet().count();
System.out.println("--> existing index, count [" + COUNT + "]");
} else {
COUNT = SizeValue.parseSizeValue("100k").singles();
System.out.println("--> indexing data...");
for (long i = 0; i < COUNT; i++) {
client.client().prepareIndex("test", "type", Long.toString(i))
.setSource("date", new Date(), "data", RandomStringGenerator.randomAlphabetic(10000))
.execute().actionGet();
}
System.out.println("--> done indexing data [" + COUNT + "]");
client.client().admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) {
long count = client.client().prepareCount().execute().actionGet().count();
if (COUNT != count) {
System.err.println("--> the indexed docs do not match the count..., got [" + count + "], expected [" + COUNT + "]");
}
}
}
final int ROLLING_RESTARTS = 100;
System.out.println("--> starting rolling restarts [" + ROLLING_RESTARTS + "]");
for (int rollingRestart = 0; rollingRestart < ROLLING_RESTARTS; rollingRestart++) {
System.out.println("--> doing rolling restart [" + rollingRestart + "]...");
int nodeId = ThreadLocalRandom.current().nextInt();
for (int i = 0; i < nodes.length; i++) {
int nodeIdx = Math.abs(nodeId++) % nodes.length;
nodes[nodeIdx].close();
nodes[nodeIdx] = NodeBuilder.nodeBuilder().settings(settings).node();
}
System.out.println("--> done rolling restart [" + rollingRestart + "]");
System.out.println("--> waiting for green state now...");
ClusterHealthResponse clusterHealthResponse = client.client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet();
if (clusterHealthResponse.timedOut()) {
System.err.println("--> timed out waiting for green state...");
ClusterState state = client.client().admin().cluster().prepareState().execute().actionGet().state();
System.out.println(state.routingTable().prettyPrint());
throw new ElasticSearchException("timed out waiting for green state");
} else {
System.out.println("--> got green status");
}
System.out.println("--> checking data [" + rollingRestart + "]....");
boolean failed = false;
for (int i = 0; i < 10; i++) {
long count = client.client().prepareCount().execute().actionGet().count();
if (COUNT != count) {
failed = true;
System.err.println("--> ERROR the indexed docs do not match the count..., got [" + count + "], expected [" + COUNT + "]");
}
}
if (!failed) {
System.out.println("--> count verified");
}
}
System.out.println("--> shutting down...");
client.close();
for (Node node : nodes) {
node.close();
}
}
}