Handle partial failure retrieving segments in SegmentCountStep (#46556)
Since the `IndicesSegmentsRequest` scatters to all shards for the index, it's possible that some of the shards may fail. This adds failure handling and logging (since this is a best-effort step in the first place) for this case.
This commit is contained in:
parent
0963e78164
commit
09a9cefaa0
|
@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
|
||||
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
|
||||
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
|
||||
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
|
@ -50,8 +51,20 @@ public class SegmentCountStep extends AsyncWaitStep {
|
|||
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
|
||||
getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetaData.getIndex().getName()),
|
||||
ActionListener.wrap(response -> {
|
||||
IndexSegments segments = response.getIndices().get(indexMetaData.getIndex().getName());
|
||||
List<ShardSegments> unmergedShards = segments.getShards().values().stream()
|
||||
IndexSegments idxSegments = response.getIndices().get(indexMetaData.getIndex().getName());
|
||||
if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
|
||||
final DefaultShardOperationFailedException[] failures = response.getShardFailures();
|
||||
logger.info("[{}] retrieval of segment counts after force merge did not succeed, " +
|
||||
"there were {} shard failures. " +
|
||||
"failures: {}",
|
||||
indexMetaData.getIndex().getName(),
|
||||
response.getFailedShards(),
|
||||
failures == null ? "n/a" : Strings.collectionToDelimitedString(Arrays.stream(failures)
|
||||
.map(Strings::toString)
|
||||
.collect(Collectors.toList()), ","));
|
||||
listener.onResponse(true, new Info(-1));
|
||||
} else {
|
||||
List<ShardSegments> unmergedShards = idxSegments.getShards().values().stream()
|
||||
.flatMap(iss -> Arrays.stream(iss.getShards()))
|
||||
.filter(shardSegments -> shardSegments.getSegments().size() > maxNumSegments)
|
||||
.collect(Collectors.toList());
|
||||
|
@ -63,6 +76,7 @@ public class SegmentCountStep extends AsyncWaitStep {
|
|||
}
|
||||
// Force merging is best effort, so always return true that the condition has been met.
|
||||
listener.onResponse(true, new Info(unmergedShards.size()));
|
||||
}
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.indices.segments.IndexSegments;
|
|||
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
|
||||
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
|
||||
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
|
||||
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
|
||||
import org.elasticsearch.client.AdminClient;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
|
@ -130,6 +131,7 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
|
|||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.warn("unexpected onFailure call", e);
|
||||
throw new AssertionError("unexpected method call");
|
||||
}
|
||||
});
|
||||
|
@ -187,6 +189,7 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
|
|||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.warn("unexpected onFailure call", e);
|
||||
throw new AssertionError("unexpected method call");
|
||||
}
|
||||
});
|
||||
|
@ -195,6 +198,67 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
|
|||
assertEquals(new SegmentCountStep.Info(0L), conditionInfo.get());
|
||||
}
|
||||
|
||||
public void testFailedToRetrieveSomeSegments() {
|
||||
int maxNumSegments = randomIntBetween(3, 10);
|
||||
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
IndicesSegmentResponse indicesSegmentResponse = Mockito.mock(IndicesSegmentResponse.class);
|
||||
IndexSegments indexSegments = Mockito.mock(IndexSegments.class);
|
||||
IndexShardSegments indexShardSegments = Mockito.mock(IndexShardSegments.class);
|
||||
Map<Integer, IndexShardSegments> indexShards = Collections.singletonMap(0, indexShardSegments);
|
||||
ShardSegments shardSegmentsOne = Mockito.mock(ShardSegments.class);
|
||||
ShardSegments[] shardSegmentsArray = new ShardSegments[] { shardSegmentsOne };
|
||||
Spliterator<IndexShardSegments> iss = indexShards.values().spliterator();
|
||||
List<Segment> segments = new ArrayList<>();
|
||||
for (int i = 0; i < maxNumSegments + randomIntBetween(1, 3); i++) {
|
||||
segments.add(null);
|
||||
}
|
||||
Mockito.when(indicesSegmentResponse.getStatus()).thenReturn(RestStatus.OK);
|
||||
Mockito.when(indicesSegmentResponse.getIndices()).thenReturn(Collections.singletonMap(index.getName(), null));
|
||||
Mockito.when(indicesSegmentResponse.getShardFailures())
|
||||
.thenReturn(new DefaultShardOperationFailedException[]{new DefaultShardOperationFailedException(index.getName(),
|
||||
0, new IllegalArgumentException("fake"))});
|
||||
Mockito.when(indexSegments.spliterator()).thenReturn(iss);
|
||||
Mockito.when(indexShardSegments.getShards()).thenReturn(shardSegmentsArray);
|
||||
Mockito.when(shardSegmentsOne.getSegments()).thenReturn(segments);
|
||||
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
|
||||
Step.StepKey stepKey = randomStepKey();
|
||||
StepKey nextStepKey = randomStepKey();
|
||||
|
||||
Mockito.doAnswer(invocationOnMock -> {
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<IndicesSegmentResponse> listener = (ActionListener<IndicesSegmentResponse>) invocationOnMock.getArguments()[1];
|
||||
listener.onResponse(indicesSegmentResponse);
|
||||
return null;
|
||||
}).when(indicesClient).segments(any(), any());
|
||||
|
||||
SetOnce<Boolean> conditionMetResult = new SetOnce<>();
|
||||
SetOnce<ToXContentObject> conditionInfo = new SetOnce<>();
|
||||
|
||||
SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
|
||||
step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean conditionMet, ToXContentObject info) {
|
||||
conditionMetResult.set(conditionMet);
|
||||
conditionInfo.set(info);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.warn("unexpected onFailure call", e);
|
||||
throw new AssertionError("unexpected method call: " + e);
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(conditionMetResult.get());
|
||||
assertEquals(new SegmentCountStep.Info(-1L), conditionInfo.get());
|
||||
}
|
||||
|
||||
public void testThrowsException() {
|
||||
Exception exception = new RuntimeException("error");
|
||||
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
|
||||
|
|
Loading…
Reference in New Issue