Avoid NPE on shard changes action (#32630)

If a leader index is deleted while there is an active follower, the
follower will send shard changes requests bound for the leader
index. Today this will result in a null pointer exception because there
will not be an index routing table for the index. A null pointer
exception looks like a bug to a user so this commit addresses this by
throwing an index not found exception instead.
This commit is contained in:
Jason Tedor 2018-08-06 08:01:47 -04:00 committed by GitHub
parent 1a39f1d6c5
commit 3b739b9fd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 3 deletions

View File

@ -259,9 +259,9 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
@Override @Override
protected ShardsIterator shards(ClusterState state, InternalRequest request) { protected ShardsIterator shards(ClusterState state, InternalRequest request) {
return state.routingTable() return state
.index(request.concreteIndex()) .routingTable()
.shard(request.request().getShard().id()) .shardRoutingTable(request.concreteIndex(), request.request().getShard().id())
.activeInitializingShardsRandomIt(); .activeInitializingShardsRandomIt();
} }

View File

@ -5,27 +5,40 @@
*/ */
package org.elasticsearch.xpack.ccr.action; package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotStartedException; import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.ESSingleNodeTestCase;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.LongStream; import java.util.stream.LongStream;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class ShardChangesActionTests extends ESSingleNodeTestCase { public class ShardChangesActionTests extends ESSingleNodeTestCase {
@Override
protected boolean resetNodeAfterTest() {
return true;
}
public void testGetOperations() throws Exception { public void testGetOperations() throws Exception {
final Settings settings = Settings.builder() final Settings settings = Settings.builder()
.put("index.number_of_shards", 1) .put("index.number_of_shards", 1)
@ -119,4 +132,52 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
assertThat(operations[0].seqNo(), equalTo(0L)); assertThat(operations[0].seqNo(), equalTo(0L));
} }
public void testIndexNotFound() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> reference = new AtomicReference<>();
final ShardChangesAction.TransportAction transportAction = node().injector().getInstance(ShardChangesAction.TransportAction.class);
transportAction.execute(
new ShardChangesAction.Request(new ShardId(new Index("non-existent", "uuid"), 0)),
new ActionListener<ShardChangesAction.Response>() {
@Override
public void onResponse(final ShardChangesAction.Response response) {
fail();
}
@Override
public void onFailure(final Exception e) {
reference.set(e);
latch.countDown();
}
});
latch.await();
assertNotNull(reference.get());
assertThat(reference.get(), instanceOf(IndexNotFoundException.class));
}
public void testShardNotFound() throws InterruptedException {
final int numberOfShards = randomIntBetween(1, 5);
final IndexService indexService = createIndex("index", Settings.builder().put("index.number_of_shards", numberOfShards).build());
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> reference = new AtomicReference<>();
final ShardChangesAction.TransportAction transportAction = node().injector().getInstance(ShardChangesAction.TransportAction.class);
transportAction.execute(
new ShardChangesAction.Request(new ShardId(indexService.getMetaData().getIndex(), numberOfShards)),
new ActionListener<ShardChangesAction.Response>() {
@Override
public void onResponse(final ShardChangesAction.Response response) {
fail();
}
@Override
public void onFailure(final Exception e) {
reference.set(e);
latch.countDown();
}
});
latch.await();
assertNotNull(reference.get());
assertThat(reference.get(), instanceOf(ShardNotFoundException.class));
}
} }