From a644bc095c347b9c823b880f9b7ac12dbaec8d47 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 25 Jan 2019 16:51:53 +0100 Subject: [PATCH] Add unit tests for ShardStateAction's ShardStartedClusterStateTaskExecutor (#37756) --- .../ClusterAllocationExplainActionTests.java | 14 +- .../ClusterStateCreationUtils.java | 2 + ...rdFailedClusterStateTaskExecutorTests.java | 3 +- ...dStartedClusterStateTaskExecutorTests.java | 197 ++++++++++++++++++ .../action/shard/ShardStateActionTests.java | 144 ++++++------- 5 files changed, 281 insertions(+), 79 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java index a75510cfb64..d0a55972cc1 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java @@ -24,6 +24,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationDecision; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; @@ -35,6 +37,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.gateway.TestGatewayAllocator; +import java.time.Instant; import java.util.Collections; import java.util.Locale; @@ -85,7 +88,16 @@ public class ClusterAllocationExplainActionTests extends ESTestCase { "wait until initialization has completed"; } assertEquals("{\"index\":\"idx\",\"shard\":0,\"primary\":true,\"current_state\":\"" + - shardRoutingState.toString().toLowerCase(Locale.ROOT) + "\",\"current_node\":" + + shardRoutingState.toString().toLowerCase(Locale.ROOT) + "\"" + + (shard.unassignedInfo() != null ? + ",\"unassigned_info\":{" + + "\"reason\":\"" + shard.unassignedInfo().getReason() + "\"," + + "\"at\":\""+ UnassignedInfo.DATE_TIME_FORMATTER.format( + Instant.ofEpochMilli(shard.unassignedInfo().getUnassignedTimeInMillis())) + "\"," + + "\"last_allocation_status\":\"" + AllocationDecision.fromAllocationStatus( + shard.unassignedInfo().getLastAllocationStatus()) + "\"}" + : "") + + ",\"current_node\":" + "{\"id\":\"" + cae.getCurrentNode().getId() + "\",\"name\":\"" + cae.getCurrentNode().getName() + "\",\"transport_address\":\"" + cae.getCurrentNode().getAddress() + "\"},\"explanation\":\"" + explanation + "\"}", Strings.toString(builder)); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index 60053748d68..6b628d88c59 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -118,6 +118,8 @@ public class ClusterStateCreationUtils { } if (primaryState == ShardRoutingState.RELOCATING) { relocatingNode = selectAndRemove(unassignedNodes); + } else if (primaryState == ShardRoutingState.INITIALIZING) { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null); } } else { unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null); diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index 60a5d4a3e3f..4dbe62cf5ce 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -48,7 +48,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; -import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; @@ -73,7 +72,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa private ClusterState clusterState; private ShardStateAction.ShardFailedClusterStateTaskExecutor executor; - @Before + @Override public void setUp() throws Exception { super.setUp(); allocationService = createAllocationService(Settings.builder() diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java new file mode 100644 index 00000000000..1d3a523cdc9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -0,0 +1,197 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.action.shard; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static java.util.Collections.singletonList; +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas; +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithNoShard; +import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestCase { + + private ShardStateAction.ShardStartedClusterStateTaskExecutor executor; + + @Override + public void setUp() throws Exception { + super.setUp(); + AllocationService allocationService = createAllocationService(Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE) + .build()); + executor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, logger); + } + + public void testEmptyTaskListProducesSameClusterState() throws Exception { + final ClusterState clusterState = stateWithNoShard(); + assertTasksExecution(clusterState, Collections.emptyList(), result -> assertSame(clusterState, result.resultingState)); + } + + public void testNonExistentIndexMarkedAsSuccessful() throws Exception { + final ClusterState clusterState = stateWithNoShard(); + final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", "test"); + assertTasksExecution(clusterState, singletonList(entry), + result -> { + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(1)); + assertThat(result.executionResults.containsKey(entry), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(entry)).isSuccess(), is(true)); + }); + } + + public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { + final String indexName = "test"; + final ClusterState clusterState = stateWithActivePrimary(indexName, true, randomInt(2), randomInt(2)); + + final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + final List tasks = Stream.concat( + // Existent shard id but different allocation id + IntStream.range(0, randomIntBetween(1, 5)) + .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), 0), String.valueOf(i), "allocation id")), + // Non existent shard id + IntStream.range(1, randomIntBetween(2, 5)) + .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), i), String.valueOf(i), "shard id")) + + ).collect(Collectors.toList()); + + assertTasksExecution(clusterState, tasks, result -> { + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + }); + }); + } + + public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception { + final String indexName = "test"; + final ClusterState clusterState = stateWithAssignedPrimariesAndReplicas(new String[]{indexName}, randomIntBetween(2, 10), 1); + + final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + final List tasks = IntStream.range(0, randomIntBetween(1, indexMetaData.getNumberOfShards())) + .mapToObj(i -> { + final ShardId shardId = new ShardId(indexMetaData.getIndex(), i); + final IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId); + final String allocationId; + if (randomBoolean()) { + allocationId = shardRoutingTable.primaryShard().allocationId().getId(); + } else { + allocationId = shardRoutingTable.replicaShards().iterator().next().allocationId().getId(); + } + return new StartedShardEntry(shardId, allocationId, "test"); + }).collect(Collectors.toList()); + + assertTasksExecution(clusterState, tasks, result -> { + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + }); + }); + } + + public void testStartedShards() throws Exception { + final String indexName = "test"; + final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING); + + final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); + final ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); + final String primaryAllocationId = primaryShard.allocationId().getId(); + + final List tasks = new ArrayList<>(); + tasks.add(new StartedShardEntry(shardId, primaryAllocationId, "test")); + if (randomBoolean()) { + final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next(); + final String replicaAllocationId = replicaShard.allocationId().getId(); + tasks.add(new StartedShardEntry(shardId, replicaAllocationId, "test")); + } + assertTasksExecution(clusterState, tasks, result -> { + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + + final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); + }); + }); + } + + public void testDuplicateStartsAreOkay() throws Exception { + final String indexName = "test"; + final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING); + + final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); + final ShardRouting shardRouting = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); + final String allocationId = shardRouting.allocationId().getId(); + + final List tasks = IntStream.range(0, randomIntBetween(2, 10)) + .mapToObj(i -> new StartedShardEntry(shardId, allocationId, "test")) + .collect(Collectors.toList()); + + assertTasksExecution(clusterState, tasks, result -> { + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + + final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); + }); + }); + } + + private void assertTasksExecution(final ClusterState state, + final List tasks, + final Consumer consumer) throws Exception { + final ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(state, tasks); + assertThat(result, notNullValue()); + consumer.accept(result); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 2a994e28618..e94a974ae7a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.action.shard; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; @@ -156,24 +157,9 @@ public class ShardStateActionTests extends ESTestCase { setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); - AtomicBoolean success = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - + final TestListener listener = new TestListener(); ShardRouting shardRouting = getRandomShardRouting(index); - shardStateAction.localShardFailed(shardRouting, "test", getSimulatedFailure(), new ShardStateAction.Listener() { - @Override - public void onSuccess() { - success.set(true); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - success.set(false); - latch.countDown(); - assert false; - } - }); + shardStateAction.localShardFailed(shardRouting, "test", getSimulatedFailure(), listener); CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertEquals(1, capturedRequests.length); @@ -188,8 +174,8 @@ public class ShardStateActionTests extends ESTestCase { transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); - latch.await(); - assertTrue(success.get()); + listener.await(); + assertNull(listener.failure.get()); } public void testNoMaster() throws InterruptedException { @@ -291,28 +277,14 @@ public class ShardStateActionTests extends ESTestCase { setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); - AtomicBoolean failure = new AtomicBoolean(); - + final TestListener listener = new TestListener(); ShardRouting failedShard = getRandomShardRouting(index); - shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { - @Override - public void onSuccess() { - failure.set(false); - assert false; - } - - @Override - public void onFailure(Exception e) { - failure.set(true); - } - }); + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), listener); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests.length, equalTo(1)); - assertFalse(failure.get()); transport.handleRemoteError(capturedRequests[0].requestId, new TransportException("simulated")); - - assertTrue(failure.get()); + assertNotNull(listener.failure.get()); } public void testShardNotFound() throws InterruptedException { @@ -320,32 +292,17 @@ public class ShardStateActionTests extends ESTestCase { setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); - AtomicBoolean success = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - + final TestListener listener = new TestListener(); ShardRouting failedShard = getRandomShardRouting(index); RoutingTable routingTable = RoutingTable.builder(clusterService.state().getRoutingTable()).remove(index).build(); setState(clusterService, ClusterState.builder(clusterService.state()).routingTable(routingTable)); - shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { - @Override - public void onSuccess() { - success.set(true); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - success.set(false); - latch.countDown(); - assert false; - } - }); + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), listener); CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); - latch.await(); - assertTrue(success.get()); + listener.await(); + assertNull(listener.failure.get()); } public void testNoLongerPrimaryShardException() throws InterruptedException { @@ -355,36 +312,23 @@ public class ShardStateActionTests extends ESTestCase { ShardRouting failedShard = getRandomShardRouting(index); - AtomicReference failure = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - + final TestListener listener = new TestListener(); long primaryTerm = clusterService.state().metaData().index(index).primaryTerm(failedShard.id()); assertThat(primaryTerm, greaterThanOrEqualTo(1L)); shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), - primaryTerm + 1, randomBoolean(), "test", getSimulatedFailure(), - new ShardStateAction.Listener() { - @Override - public void onSuccess() { - failure.set(null); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - failure.set(e); - latch.countDown(); - } - }); + primaryTerm + 1, randomBoolean(), "test", getSimulatedFailure(), listener); ShardStateAction.NoLongerPrimaryShardException catastrophicError = new ShardStateAction.NoLongerPrimaryShardException(failedShard.shardId(), "dummy failure"); CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); transport.handleRemoteError(capturedRequests[0].requestId, catastrophicError); - latch.await(); - assertNotNull(failure.get()); - assertThat(failure.get(), instanceOf(ShardStateAction.NoLongerPrimaryShardException.class)); - assertThat(failure.get().getMessage(), equalTo(catastrophicError.getMessage())); + listener.await(); + + final Exception failure = listener.failure.get(); + assertNotNull(failure); + assertThat(failure, instanceOf(ShardStateAction.NoLongerPrimaryShardException.class)); + assertThat(failure.getMessage(), equalTo(catastrophicError.getMessage())); } public void testCacheRemoteShardFailed() throws Exception { @@ -471,6 +415,26 @@ public class ShardStateActionTests extends ESTestCase { masterThread.join(); } + public void testShardStarted() throws InterruptedException { + final String index = "test"; + setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); + + final ShardRouting shardRouting = getRandomShardRouting(index); + final TestListener listener = new TestListener(); + shardStateAction.shardStarted(shardRouting, "testShardStarted", listener); + + final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests[0].request, instanceOf(ShardStateAction.StartedShardEntry.class)); + + ShardStateAction.StartedShardEntry entry = (ShardStateAction.StartedShardEntry) capturedRequests[0].request; + assertThat(entry.shardId, equalTo(shardRouting.shardId())); + assertThat(entry.allocationId, equalTo(shardRouting.allocationId().getId())); + + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + listener.await(); + assertNull(listener.failure.get()); + } + private ShardRouting getRandomShardRouting(String index) { IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index); ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt(); @@ -600,4 +564,32 @@ public class ShardStateActionTests extends ESTestCase { } }); } + + private static class TestListener implements ShardStateAction.Listener { + + private final SetOnce failure = new SetOnce<>(); + private final CountDownLatch latch = new CountDownLatch(1); + + @Override + public void onSuccess() { + try { + failure.set(null); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(final Exception e) { + try { + failure.set(e); + } finally { + latch.countDown(); + } + } + + void await() throws InterruptedException { + latch.await(); + } + } }