Add unit tests for ShardStateAction's ShardStartedClusterStateTaskExecutor (#37756)

This commit is contained in:
Tanguy Leroux 2019-01-25 16:51:53 +01:00 committed by GitHub
parent dfecb256cb
commit a644bc095c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 281 additions and 79 deletions

View File

@ -24,6 +24,8 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
@ -35,6 +37,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
import java.time.Instant;
import java.util.Collections;
import java.util.Locale;
@ -85,7 +88,16 @@ public class ClusterAllocationExplainActionTests extends ESTestCase {
"wait until initialization has completed";
}
assertEquals("{\"index\":\"idx\",\"shard\":0,\"primary\":true,\"current_state\":\"" +
shardRoutingState.toString().toLowerCase(Locale.ROOT) + "\",\"current_node\":" +
shardRoutingState.toString().toLowerCase(Locale.ROOT) + "\"" +
(shard.unassignedInfo() != null ?
",\"unassigned_info\":{"
+ "\"reason\":\"" + shard.unassignedInfo().getReason() + "\","
+ "\"at\":\""+ UnassignedInfo.DATE_TIME_FORMATTER.format(
Instant.ofEpochMilli(shard.unassignedInfo().getUnassignedTimeInMillis())) + "\","
+ "\"last_allocation_status\":\"" + AllocationDecision.fromAllocationStatus(
shard.unassignedInfo().getLastAllocationStatus()) + "\"}"
: "")
+ ",\"current_node\":" +
"{\"id\":\"" + cae.getCurrentNode().getId() + "\",\"name\":\"" + cae.getCurrentNode().getName() +
"\",\"transport_address\":\"" + cae.getCurrentNode().getAddress() +
"\"},\"explanation\":\"" + explanation + "\"}", Strings.toString(builder));

View File

@ -118,6 +118,8 @@ public class ClusterStateCreationUtils {
}
if (primaryState == ShardRoutingState.RELOCATING) {
relocatingNode = selectAndRemove(unassignedNodes);
} else if (primaryState == ShardRoutingState.INITIALIZING) {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
}
} else {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);

View File

@ -48,7 +48,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
@ -73,7 +72,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
private ClusterState clusterState;
private ShardStateAction.ShardFailedClusterStateTaskExecutor executor;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
allocationService = createAllocationService(Settings.builder()

View File

@ -0,0 +1,197 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.shard;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.util.Collections.singletonList;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithNoShard;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestCase {
private ShardStateAction.ShardStartedClusterStateTaskExecutor executor;
@Override
public void setUp() throws Exception {
super.setUp();
AllocationService allocationService = createAllocationService(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE)
.build());
executor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, logger);
}
public void testEmptyTaskListProducesSameClusterState() throws Exception {
final ClusterState clusterState = stateWithNoShard();
assertTasksExecution(clusterState, Collections.emptyList(), result -> assertSame(clusterState, result.resultingState));
}
public void testNonExistentIndexMarkedAsSuccessful() throws Exception {
final ClusterState clusterState = stateWithNoShard();
final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", "test");
assertTasksExecution(clusterState, singletonList(entry),
result -> {
assertSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(1));
assertThat(result.executionResults.containsKey(entry), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(entry)).isSuccess(), is(true));
});
}
public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception {
final String indexName = "test";
final ClusterState clusterState = stateWithActivePrimary(indexName, true, randomInt(2), randomInt(2));
final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
final List<StartedShardEntry> tasks = Stream.concat(
// Existent shard id but different allocation id
IntStream.range(0, randomIntBetween(1, 5))
.mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), 0), String.valueOf(i), "allocation id")),
// Non existent shard id
IntStream.range(1, randomIntBetween(2, 5))
.mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), i), String.valueOf(i), "shard id"))
).collect(Collectors.toList());
assertTasksExecution(clusterState, tasks, result -> {
assertSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> {
assertThat(result.executionResults.containsKey(task), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true));
});
});
}
public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception {
final String indexName = "test";
final ClusterState clusterState = stateWithAssignedPrimariesAndReplicas(new String[]{indexName}, randomIntBetween(2, 10), 1);
final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
final List<StartedShardEntry> tasks = IntStream.range(0, randomIntBetween(1, indexMetaData.getNumberOfShards()))
.mapToObj(i -> {
final ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
final IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId);
final String allocationId;
if (randomBoolean()) {
allocationId = shardRoutingTable.primaryShard().allocationId().getId();
} else {
allocationId = shardRoutingTable.replicaShards().iterator().next().allocationId().getId();
}
return new StartedShardEntry(shardId, allocationId, "test");
}).collect(Collectors.toList());
assertTasksExecution(clusterState, tasks, result -> {
assertSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> {
assertThat(result.executionResults.containsKey(task), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true));
});
});
}
public void testStartedShards() throws Exception {
final String indexName = "test";
final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING);
final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0);
final ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId).primaryShard();
final String primaryAllocationId = primaryShard.allocationId().getId();
final List<StartedShardEntry> tasks = new ArrayList<>();
tasks.add(new StartedShardEntry(shardId, primaryAllocationId, "test"));
if (randomBoolean()) {
final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next();
final String replicaAllocationId = replicaShard.allocationId().getId();
tasks.add(new StartedShardEntry(shardId, replicaAllocationId, "test"));
}
assertTasksExecution(clusterState, tasks, result -> {
assertNotSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> {
assertThat(result.executionResults.containsKey(task), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true));
final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId);
assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED));
});
});
}
public void testDuplicateStartsAreOkay() throws Exception {
final String indexName = "test";
final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING);
final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0);
final ShardRouting shardRouting = clusterState.routingTable().shardRoutingTable(shardId).primaryShard();
final String allocationId = shardRouting.allocationId().getId();
final List<StartedShardEntry> tasks = IntStream.range(0, randomIntBetween(2, 10))
.mapToObj(i -> new StartedShardEntry(shardId, allocationId, "test"))
.collect(Collectors.toList());
assertTasksExecution(clusterState, tasks, result -> {
assertNotSame(clusterState, result.resultingState);
assertThat(result.executionResults.size(), equalTo(tasks.size()));
tasks.forEach(task -> {
assertThat(result.executionResults.containsKey(task), is(true));
assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true));
final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId);
assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED));
});
});
}
private void assertTasksExecution(final ClusterState state,
final List<StartedShardEntry> tasks,
final Consumer<ClusterStateTaskExecutor.ClusterTasksResult> consumer) throws Exception {
final ClusterStateTaskExecutor.ClusterTasksResult<StartedShardEntry> result = executor.execute(state, tasks);
assertThat(result, notNullValue());
consumer.accept(result);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.action.shard;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
@ -156,24 +157,9 @@ public class ShardStateActionTests extends ESTestCase {
setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
AtomicBoolean success = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(1);
final TestListener listener = new TestListener();
ShardRouting shardRouting = getRandomShardRouting(index);
shardStateAction.localShardFailed(shardRouting, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
latch.countDown();
}
@Override
public void onFailure(Exception e) {
success.set(false);
latch.countDown();
assert false;
}
});
shardStateAction.localShardFailed(shardRouting, "test", getSimulatedFailure(), listener);
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertEquals(1, capturedRequests.length);
@ -188,8 +174,8 @@ public class ShardStateActionTests extends ESTestCase {
transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE);
latch.await();
assertTrue(success.get());
listener.await();
assertNull(listener.failure.get());
}
public void testNoMaster() throws InterruptedException {
@ -291,28 +277,14 @@ public class ShardStateActionTests extends ESTestCase {
setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
AtomicBoolean failure = new AtomicBoolean();
final TestListener listener = new TestListener();
ShardRouting failedShard = getRandomShardRouting(index);
shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
failure.set(false);
assert false;
}
@Override
public void onFailure(Exception e) {
failure.set(true);
}
});
shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), listener);
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(1));
assertFalse(failure.get());
transport.handleRemoteError(capturedRequests[0].requestId, new TransportException("simulated"));
assertTrue(failure.get());
assertNotNull(listener.failure.get());
}
public void testShardNotFound() throws InterruptedException {
@ -320,32 +292,17 @@ public class ShardStateActionTests extends ESTestCase {
setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
AtomicBoolean success = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(1);
final TestListener listener = new TestListener();
ShardRouting failedShard = getRandomShardRouting(index);
RoutingTable routingTable = RoutingTable.builder(clusterService.state().getRoutingTable()).remove(index).build();
setState(clusterService, ClusterState.builder(clusterService.state()).routingTable(routingTable));
shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
latch.countDown();
}
@Override
public void onFailure(Exception e) {
success.set(false);
latch.countDown();
assert false;
}
});
shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), listener);
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE);
latch.await();
assertTrue(success.get());
listener.await();
assertNull(listener.failure.get());
}
public void testNoLongerPrimaryShardException() throws InterruptedException {
@ -355,36 +312,23 @@ public class ShardStateActionTests extends ESTestCase {
ShardRouting failedShard = getRandomShardRouting(index);
AtomicReference<Throwable> failure = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
final TestListener listener = new TestListener();
long primaryTerm = clusterService.state().metaData().index(index).primaryTerm(failedShard.id());
assertThat(primaryTerm, greaterThanOrEqualTo(1L));
shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(),
primaryTerm + 1, randomBoolean(), "test", getSimulatedFailure(),
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
failure.set(null);
latch.countDown();
}
@Override
public void onFailure(Exception e) {
failure.set(e);
latch.countDown();
}
});
primaryTerm + 1, randomBoolean(), "test", getSimulatedFailure(), listener);
ShardStateAction.NoLongerPrimaryShardException catastrophicError =
new ShardStateAction.NoLongerPrimaryShardException(failedShard.shardId(), "dummy failure");
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
transport.handleRemoteError(capturedRequests[0].requestId, catastrophicError);
latch.await();
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(ShardStateAction.NoLongerPrimaryShardException.class));
assertThat(failure.get().getMessage(), equalTo(catastrophicError.getMessage()));
listener.await();
final Exception failure = listener.failure.get();
assertNotNull(failure);
assertThat(failure, instanceOf(ShardStateAction.NoLongerPrimaryShardException.class));
assertThat(failure.getMessage(), equalTo(catastrophicError.getMessage()));
}
public void testCacheRemoteShardFailed() throws Exception {
@ -471,6 +415,26 @@ public class ShardStateActionTests extends ESTestCase {
masterThread.join();
}
public void testShardStarted() throws InterruptedException {
final String index = "test";
setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
final ShardRouting shardRouting = getRandomShardRouting(index);
final TestListener listener = new TestListener();
shardStateAction.shardStarted(shardRouting, "testShardStarted", listener);
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests[0].request, instanceOf(ShardStateAction.StartedShardEntry.class));
ShardStateAction.StartedShardEntry entry = (ShardStateAction.StartedShardEntry) capturedRequests[0].request;
assertThat(entry.shardId, equalTo(shardRouting.shardId()));
assertThat(entry.allocationId, equalTo(shardRouting.allocationId().getId()));
transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE);
listener.await();
assertNull(listener.failure.get());
}
private ShardRouting getRandomShardRouting(String index) {
IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index);
ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt();
@ -600,4 +564,32 @@ public class ShardStateActionTests extends ESTestCase {
}
});
}
private static class TestListener implements ShardStateAction.Listener {
private final SetOnce<Exception> failure = new SetOnce<>();
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void onSuccess() {
try {
failure.set(null);
} finally {
latch.countDown();
}
}
@Override
public void onFailure(final Exception e) {
try {
failure.set(e);
} finally {
latch.countDown();
}
}
void await() throws InterruptedException {
latch.await();
}
}
}