Also send Refresh and Flush actions to relocation targets

Currently we send relocation & flush actions based on all assigned ShardRoutings. During the final stage of relocation, we may miss to refresh/flush a shard if the coordinating node has not yet processed the cluster state update indicating that a relocation is completed *and* the relocation target node has already processed it (i.e., started the shard and has accepted new indexing requests).

This commit is contained in:
Boaz Leskes 2014-06-17 13:55:35 +02:00
parent 68046b64c2
commit 1114835de5
8 changed files with 183 additions and 14 deletions

@ -120,7 +120,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
protected GroupShardsIterator shards(ClusterState clusterState, FlushRequest request, String[] concreteIndices) {
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true, true);

@ -123,7 +123,7 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
protected GroupShardsIterator shards(ClusterState clusterState, RefreshRequest request, String[] concreteIndices) {
logger.trace("resolving shards to refresh based on cluster state version [{}]", clusterState.version());
return clusterState.routingTable().allAssignedShardsGrouped(concreteIndices, true);
return clusterState.routingTable().allAssignedShardsGrouped(concreteIndices, true, true);

@ -89,6 +89,10 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
protected abstract ShardResponse shardOperation(ShardRequest request) throws ElasticsearchException;
* Determines the shards this operation will be executed on. The operation is executed once per shard iterator, typically
* on the first shard in it. If the operation fails, it will be retried on the next shard in the iterator.
protected abstract GroupShardsIterator shards(ClusterState clusterState, Request request, String[] concreteIndices);
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);

@ -165,6 +165,14 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
return this.relocatingNodeId;
public ShardRouting targetRoutingIfRelocating() {
if (!relocating()) {
return null;
return new ImmutableShardRouting(index, shardId, relocatingNodeId, currentNodeId, primary, ShardRoutingState.INITIALIZING, version);
public RestoreSource restoreSource() {
return restoreSource;
@ -276,20 +284,32 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
public boolean equals(Object o) {
if (this == o) return true;
if (this == o) {
return true;
// we check on instanceof so we also handle the MutableShardRouting case as well
if (o == null || !(o instanceof ImmutableShardRouting)) return false;
if (o == null || !(o instanceof ImmutableShardRouting)) {
return false;
ImmutableShardRouting that = (ImmutableShardRouting) o;
if (primary != that.primary) return false;
if (shardId != that.shardId) return false;
if (primary != that.primary) {
return false;
if (shardId != that.shardId) {
return false;
if (currentNodeId != null ? !currentNodeId.equals(that.currentNodeId) : that.currentNodeId != null)
return false;
if (index != null ? !index.equals(that.index) : that.index != null) return false;
if (index != null ? !index.equals(that.index) : that.index != null) {
return false;
if (relocatingNodeId != null ? !relocatingNodeId.equals(that.relocatingNodeId) : that.relocatingNodeId != null)
return false;
if (state != that.state) return false;
if (state != that.state) {
return false;
if (restoreSource != null ? !restoreSource.equals(that.restoreSource) : that.restoreSource != null)
return false;

@ -88,7 +88,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
if (shard.relocating()) {
// create the target initializing shard routing on the node the shard is relocating to
allInitializingShards.add(new ImmutableShardRouting(shard.index(),, shard.relocatingNodeId(), shard.currentNodeId(), shard.primary(), ShardRoutingState.INITIALIZING, shard.version()));
if (shard.assignedToNode()) {

@ -174,6 +174,17 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
public GroupShardsIterator allActiveShardsGrouped(String[] indices, boolean includeEmpty) throws IndexMissingException {
return allActiveShardsGrouped(indices, includeEmpty, false);
* Return GroupShardsIterator where each active shard routing has it's own shard iterator.
* @param includeEmpty if true, a shard iterator will be added for non-assigned shards as well
* @param includeRelocationTargets if true, an <b>extra</b> shard iterator will be added for relocating shards. The extra
* iterator contains a single ShardRouting pointing at the relocating target
public GroupShardsIterator allActiveShardsGrouped(String[] indices, boolean includeEmpty, boolean includeRelocationTargets) throws IndexMissingException {
// use list here since we need to maintain identity across shards
ArrayList<ShardIterator> set = new ArrayList<>();
if (indices == null || indices.length == 0) {
@ -190,6 +201,9 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
for (ShardRouting shardRouting : indexShardRoutingTable) {
if ( {
if (includeRelocationTargets && shardRouting.relocating()) {
set.add(new PlainShardIterator(shardRouting.shardId(), ImmutableList.of(shardRouting.targetRoutingIfRelocating())));
} else if (includeEmpty) { // we need this for counting properly, just make it an empty one
set.add(new PlainShardIterator(shardRouting.shardId(), ImmutableList.<ShardRouting>of()));
@ -200,6 +214,17 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
public GroupShardsIterator allAssignedShardsGrouped(String[] indices, boolean includeEmpty) throws IndexMissingException {
return allAssignedShardsGrouped(indices, includeEmpty, false);
* Return GroupShardsIterator where each assigned shard routing has it's own shard iterator.
* @param includeEmpty if true, a shard iterator will be added for non-assigned shards as well
* @param includeRelocationTargets if true, an <b>extra</b> shard iterator will be added for relocating shards. The extra
* iterator contains a single ShardRouting pointing at the relocating target
public GroupShardsIterator allAssignedShardsGrouped(String[] indices, boolean includeEmpty, boolean includeRelocationTargets) throws IndexMissingException {
// use list here since we need to maintain identity across shards
ArrayList<ShardIterator> set = new ArrayList<>();
if (indices == null || indices.length == 0) {
@ -216,6 +241,9 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
for (ShardRouting shardRouting : indexShardRoutingTable) {
if (shardRouting.assignedToNode()) {
if (includeRelocationTargets && shardRouting.relocating()) {
set.add(new PlainShardIterator(shardRouting.shardId(), ImmutableList.of(shardRouting.targetRoutingIfRelocating())));
} else if (includeEmpty) { // we need this for counting properly, just make it an empty one
set.add(new PlainShardIterator(shardRouting.shardId(), ImmutableList.<ShardRouting>of()));

@ -116,6 +116,13 @@ public interface ShardRouting extends Streamable, Serializable, ToXContent {
String relocatingNodeId();
* If the shard is relocating, return a shard routing representing the target shard or null o.w.
* The target shard routing will be the INITIALIZING state and have relocatingNodeId set to the
* source node.
ShardRouting targetRoutingIfRelocating();
* Snapshot id and repository where this shard is being restored from

@ -23,12 +23,20 @@ import com.carrotsearch.hppc.IntOpenHashSet;
import com.carrotsearch.hppc.procedures.IntProcedure;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.test.BackgroundIndexer;
@ -36,11 +44,15 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
@ -58,8 +70,8 @@ public class RelocationTests extends ElasticsearchIntegrationTest {"--> creating test index ...");
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
@ -114,8 +126,8 @@ public class RelocationTests extends ElasticsearchIntegrationTest {"--> creating test index ...");
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
@ -208,4 +220,102 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
public void testRelocationWhileRefreshing() throws Exception {
int numberOfRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4);
int numberOfReplicas = randomBoolean() ? 0 : 1;
int numberOfNodes = numberOfReplicas == 0 ? 2 : 3;"testRelocationWhileIndexingRandom(numRelocations={}, numberOfReplicas={}, numberOfNodes={})", numberOfRelocations, numberOfReplicas, numberOfNodes);
String[] nodes = new String[numberOfNodes];"--> starting [node1] ...");
nodes[0] = internalCluster().startNode();"--> creating test index ...");
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
.put("index.refresh_interval", -1) // we want to control refreshes c
for (int i = 1; i < numberOfNodes; i++) {"--> starting [node{}] ...", i + 1);
nodes[i] = internalCluster().startNode();
if (i != numberOfNodes - 1) {
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(i + 1)).setWaitForGreenStatus().execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
final Semaphore postRecoveryShards = new Semaphore(0);
for (IndicesLifecycle indicesLifecycle : internalCluster().getInstances(IndicesLifecycle.class)) {
indicesLifecycle.addListener(new IndicesLifecycle.Listener() {
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
if (currentState == IndexShardState.POST_RECOVERY) {
}"--> starting relocations...");
int nodeShiftBased = numberOfReplicas; // if we have replicas shift those
for (int i = 0; i < numberOfRelocations; i++) {
int fromNode = (i % 2);
int toNode = fromNode == 0 ? 1 : 0;
fromNode += nodeShiftBased;
toNode += nodeShiftBased;
List<IndexRequestBuilder> builders1 = new ArrayList<>();
for (int numDocs = randomIntBetween(10, 30); numDocs > 0; numDocs--) {
builders1.add(client().prepareIndex("test", "type").setSource("{}"));
List<IndexRequestBuilder> builders2 = new ArrayList<>();
for (int numDocs = randomIntBetween(10, 30); numDocs > 0; numDocs--) {
builders2.add(client().prepareIndex("test", "type").setSource("{}"));
}"--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]);
.add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode]))
logger.debug("--> index [{}] documents", builders1.size());
indexRandom(false, true, builders1);
// wait for shard to reach post recovery
logger.debug("--> index [{}] documents", builders2.size());
indexRandom(true, true, builders2);
// verify cluster was finished.
assertFalse(client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).setTimeout("30s").get().isTimedOut());"--> DONE relocate the shard from {} to {}", fromNode, toNode);
logger.debug("--> verifying all searches return the same number of docs");
long expectedCount = -1;
for (Client client : clients()) {
SearchResponse response = client.prepareSearch("test").setPreference("_local").setSearchType(SearchType.COUNT).get();
if (expectedCount < 0) {
expectedCount = response.getHits().totalHits();
} else {
assertEquals(expectedCount, response.getHits().totalHits());