auto_expand_replicas: [0-auto] can cause data loss when nodes are removed, closes #934.

This is caused because of a race condition between when to handle the removed node and move a replica to a primary mode, and when to remove the replica because of the 0-auto setting.
This commit is contained in:
kimchy 2011-05-17 01:41:05 +03:00
parent c9aca9c6de
commit 518488b0b2
4 changed files with 69 additions and 27 deletions

View File

@ -16,6 +16,16 @@
<option name="JD_P_AT_EMPTY_LINES" value="false" /> <option name="JD_P_AT_EMPTY_LINES" value="false" />
<option name="METHOD_ANNOTATION_WRAP" value="0" /> <option name="METHOD_ANNOTATION_WRAP" value="0" />
<option name="FIELD_ANNOTATION_WRAP" value="0" /> <option name="FIELD_ANNOTATION_WRAP" value="0" />
<ADDITIONAL_INDENT_OPTIONS fileType="css">
<option name="INDENT_SIZE" value="4" />
<option name="CONTINUATION_INDENT_SIZE" value="8" />
<option name="TAB_SIZE" value="4" />
<option name="USE_TAB_CHARACTER" value="false" />
<option name="SMART_TABS" value="false" />
<option name="LABEL_INDENT_SIZE" value="0" />
<option name="LABEL_INDENT_ABSOLUTE" value="false" />
<option name="USE_RELATIVE_INDENTS" value="false" />
</ADDITIONAL_INDENT_OPTIONS>
<ADDITIONAL_INDENT_OPTIONS fileType="groovy"> <ADDITIONAL_INDENT_OPTIONS fileType="groovy">
<option name="INDENT_SIZE" value="4" /> <option name="INDENT_SIZE" value="4" />
<option name="CONTINUATION_INDENT_SIZE" value="8" /> <option name="CONTINUATION_INDENT_SIZE" value="8" />
@ -36,6 +46,16 @@
<option name="LABEL_INDENT_ABSOLUTE" value="false" /> <option name="LABEL_INDENT_ABSOLUTE" value="false" />
<option name="USE_RELATIVE_INDENTS" value="false" /> <option name="USE_RELATIVE_INDENTS" value="false" />
</ADDITIONAL_INDENT_OPTIONS> </ADDITIONAL_INDENT_OPTIONS>
<ADDITIONAL_INDENT_OPTIONS fileType="haml">
<option name="INDENT_SIZE" value="2" />
<option name="CONTINUATION_INDENT_SIZE" value="8" />
<option name="TAB_SIZE" value="4" />
<option name="USE_TAB_CHARACTER" value="false" />
<option name="SMART_TABS" value="false" />
<option name="LABEL_INDENT_SIZE" value="0" />
<option name="LABEL_INDENT_ABSOLUTE" value="false" />
<option name="USE_RELATIVE_INDENTS" value="false" />
</ADDITIONAL_INDENT_OPTIONS>
<ADDITIONAL_INDENT_OPTIONS fileType="java"> <ADDITIONAL_INDENT_OPTIONS fileType="java">
<option name="INDENT_SIZE" value="4" /> <option name="INDENT_SIZE" value="4" />
<option name="CONTINUATION_INDENT_SIZE" value="8" /> <option name="CONTINUATION_INDENT_SIZE" value="8" />
@ -126,6 +146,11 @@
<option name="LABEL_INDENT_ABSOLUTE" value="false" /> <option name="LABEL_INDENT_ABSOLUTE" value="false" />
<option name="USE_RELATIVE_INDENTS" value="false" /> <option name="USE_RELATIVE_INDENTS" value="false" />
</ADDITIONAL_INDENT_OPTIONS> </ADDITIONAL_INDENT_OPTIONS>
<codeStyleSettings language="JavaScript">
<option name="METHOD_ANNOTATION_WRAP" value="0" />
<option name="FIELD_ANNOTATION_WRAP" value="0" />
<option name="PARENT_SETTINGS_INSTALLED" value="true" />
</codeStyleSettings>
</value> </value>
</option> </option>
<option name="USE_PER_PROJECT_SETTINGS" value="true" /> <option name="USE_PER_PROJECT_SETTINGS" value="true" />

View File

@ -54,6 +54,11 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
*/ */
OperationRouting operationRouting(); OperationRouting operationRouting();
/**
* Adds a priority listener for updated cluster states.
*/
void addPriority(ClusterStateListener listener);
/** /**
* Adds a listener for updated cluster states. * Adds a listener for updated cluster states.
*/ */

View File

@ -39,6 +39,8 @@ import static org.elasticsearch.common.unit.TimeValue.*;
*/ */
public class RoutingService extends AbstractLifecycleComponent<RoutingService> implements ClusterStateListener { public class RoutingService extends AbstractLifecycleComponent<RoutingService> implements ClusterStateListener {
private static final String CLUSTER_UPDATE_TASK_SOURCE = "routing-table-updater";
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ClusterService clusterService; private final ClusterService clusterService;
@ -60,7 +62,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
} }
@Override protected void doStart() throws ElasticSearchException { @Override protected void doStart() throws ElasticSearchException {
clusterService.add(this); clusterService.addPriority(this);
} }
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
@ -75,7 +77,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
} }
@Override public void clusterChanged(ClusterChangedEvent event) { @Override public void clusterChanged(ClusterChangedEvent event) {
if (event.source().equals(RoutingTableUpdater.CLUSTER_UPDATE_TASK_SOURCE)) { if (event.source().equals(CLUSTER_UPDATE_TASK_SOURCE)) {
// that's us, ignore this event // that's us, ignore this event
return; return;
} }
@ -93,7 +95,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
// also, if the routing table changed, it means that we have new indices, or shard have started // also, if the routing table changed, it means that we have new indices, or shard have started
// or failed, we want to apply this as fast as possible // or failed, we want to apply this as fast as possible
routingTableDirty = true; routingTableDirty = true;
threadPool.cached().execute(new RoutingTableUpdater()); reroute();
} else { } else {
if (event.nodesAdded()) { if (event.nodesAdded()) {
routingTableDirty = true; routingTableDirty = true;
@ -107,32 +109,34 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
} }
} }
private void reroute() {
try {
if (!routingTableDirty) {
return;
}
if (lifecycle.stopped()) {
return;
}
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingAllocation.Result routingResult = shardsAllocation.reroute(currentState);
if (!routingResult.changed()) {
// no state changed
return currentState;
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
});
routingTableDirty = false;
} catch (Exception e) {
logger.warn("Failed to reroute routing table", e);
}
}
private class RoutingTableUpdater implements Runnable { private class RoutingTableUpdater implements Runnable {
private static final String CLUSTER_UPDATE_TASK_SOURCE = "routing-table-updater";
@Override public void run() { @Override public void run() {
try { reroute();
if (!routingTableDirty) {
return;
}
if (lifecycle.stopped()) {
return;
}
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingAllocation.Result routingResult = shardsAllocation.reroute(currentState);
if (!routingResult.changed()) {
// no state changed
return currentState;
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
});
routingTableDirty = false;
} catch (Exception e) {
logger.warn("Failed to reroute routing table", e);
}
} }
} }
} }

View File

@ -65,6 +65,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private volatile ExecutorService updateTasksExecutor; private volatile ExecutorService updateTasksExecutor;
private final List<ClusterStateListener> priorityClusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>(); private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
private final Queue<NotifyTimeout> onGoingTimeouts = new LinkedTransferQueue<NotifyTimeout>(); private final Queue<NotifyTimeout> onGoingTimeouts = new LinkedTransferQueue<NotifyTimeout>();
@ -127,13 +128,17 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return this.clusterState; return this.clusterState;
} }
public void addPriority(ClusterStateListener listener) {
priorityClusterStateListeners.add(listener);
}
public void add(ClusterStateListener listener) { public void add(ClusterStateListener listener) {
clusterStateListeners.add(listener); clusterStateListeners.add(listener);
} }
public void remove(ClusterStateListener listener) { public void remove(ClusterStateListener listener) {
clusterStateListeners.remove(listener); clusterStateListeners.remove(listener);
for (Iterator<NotifyTimeout> it = onGoingTimeouts.iterator(); it.hasNext();) { for (Iterator<NotifyTimeout> it = onGoingTimeouts.iterator(); it.hasNext(); ) {
NotifyTimeout timeout = it.next(); NotifyTimeout timeout = it.next();
if (timeout.listener.equals(listener)) { if (timeout.listener.equals(listener)) {
timeout.cancel(); timeout.cancel();
@ -226,6 +231,9 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
} }
} }
for (ClusterStateListener listener : priorityClusterStateListeners) {
listener.clusterChanged(clusterChangedEvent);
}
for (ClusterStateListener listener : clusterStateListeners) { for (ClusterStateListener listener : clusterStateListeners) {
listener.clusterChanged(clusterChangedEvent); listener.clusterChanged(clusterChangedEvent);
} }