diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index d651ce5365a..bc63ba5944e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -259,9 +259,9 @@ public class ShardChangesAction extends Action { @Override protected ShardsIterator shards(ClusterState state, InternalRequest request) { - return state.routingTable() - .index(request.concreteIndex()) - .shard(request.request().getShard().id()) + return state + .routingTable() + .shardRoutingTable(request.concreteIndex(), request.request().getShard().id()) .activeInitializingShardsRandomIt(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java index 079b670466a..ac6d8f786fb 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -5,27 +5,40 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotStartedException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESSingleNodeTestCase; import org.mockito.Mockito; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.LongStream; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class ShardChangesActionTests extends ESSingleNodeTestCase { + @Override + protected boolean resetNodeAfterTest() { + return true; + } + public void testGetOperations() throws Exception { final Settings settings = Settings.builder() .put("index.number_of_shards", 1) @@ -119,4 +132,52 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase { assertThat(operations[0].seqNo(), equalTo(0L)); } + public void testIndexNotFound() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference reference = new AtomicReference<>(); + final ShardChangesAction.TransportAction transportAction = node().injector().getInstance(ShardChangesAction.TransportAction.class); + transportAction.execute( + new ShardChangesAction.Request(new ShardId(new Index("non-existent", "uuid"), 0)), + new ActionListener() { + @Override + public void onResponse(final ShardChangesAction.Response response) { + fail(); + } + + @Override + public void onFailure(final Exception e) { + reference.set(e); + latch.countDown(); + } + }); + latch.await(); + assertNotNull(reference.get()); + assertThat(reference.get(), instanceOf(IndexNotFoundException.class)); + } + + public void testShardNotFound() throws InterruptedException { + final int numberOfShards = randomIntBetween(1, 5); + final IndexService indexService = createIndex("index", Settings.builder().put("index.number_of_shards", numberOfShards).build()); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference reference = new AtomicReference<>(); + final ShardChangesAction.TransportAction transportAction = node().injector().getInstance(ShardChangesAction.TransportAction.class); + transportAction.execute( + new ShardChangesAction.Request(new ShardId(indexService.getMetaData().getIndex(), numberOfShards)), + new ActionListener() { + @Override + public void onResponse(final ShardChangesAction.Response response) { + fail(); + } + + @Override + public void onFailure(final Exception e) { + reference.set(e); + latch.countDown(); + } + }); + latch.await(); + assertNotNull(reference.get()); + assertThat(reference.get(), instanceOf(ShardNotFoundException.class)); + } + }