Make ILM force merging best effort (#43246)
It's possible for force merges kicked off by ILM to silently stop (due to a node relocating for example). In which case, the segment count may not reach what the user configured. In the subsequent `SegmentCountStep` waiting for the expected segment count may wait indefinitely. Because of this, this commit makes force merges "best effort" and then changes the `SegmentCountStep` to simply report (at INFO level) if the merge was not successful. Relates to #42824 Resolves #43245
This commit is contained in:
parent
ba15d08e14
commit
21da84edbc
|
@ -5,10 +5,15 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
|
||||
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
|
||||
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
|
@ -17,13 +22,17 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.StreamSupport;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* This {@link Step} evaluates whether force_merge was successful by checking the segment count.
|
||||
*/
|
||||
public class SegmentCountStep extends AsyncWaitStep {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(SegmentCountStep.class);
|
||||
public static final String NAME = "segment-count";
|
||||
|
||||
private final int maxNumSegments;
|
||||
|
@ -41,10 +50,19 @@ public class SegmentCountStep extends AsyncWaitStep {
|
|||
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
|
||||
getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetaData.getIndex().getName()),
|
||||
ActionListener.wrap(response -> {
|
||||
long numberShardsLeftToMerge =
|
||||
StreamSupport.stream(response.getIndices().get(indexMetaData.getIndex().getName()).spliterator(), false)
|
||||
.filter(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments)).count();
|
||||
listener.onResponse(numberShardsLeftToMerge == 0, new Info(numberShardsLeftToMerge));
|
||||
IndexSegments segments = response.getIndices().get(indexMetaData.getIndex().getName());
|
||||
List<ShardSegments> unmergedShards = segments.getShards().values().stream()
|
||||
.flatMap(iss -> Arrays.stream(iss.getShards()))
|
||||
.filter(shardSegments -> shardSegments.getSegments().size() > maxNumSegments)
|
||||
.collect(Collectors.toList());
|
||||
if (unmergedShards.size() > 0) {
|
||||
Map<ShardRouting, Integer> unmergedShardCounts = unmergedShards.stream()
|
||||
.collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
|
||||
logger.info("[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
|
||||
indexMetaData.getIndex().getName(), maxNumSegments, unmergedShards.size(), unmergedShardCounts);
|
||||
}
|
||||
// Force merging is best effort, so always return true that the condition has been met.
|
||||
listener.onResponse(true, new Info(unmergedShards.size()));
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
|
@ -90,8 +108,12 @@ public class SegmentCountStep extends AsyncWaitStep {
|
|||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(MESSAGE.getPreferredName(),
|
||||
"Waiting for [" + numberShardsLeftToMerge + "] shards " + "to forcemerge");
|
||||
if (numberShardsLeftToMerge == 0) {
|
||||
builder.field(MESSAGE.getPreferredName(), "all shards force merged successfully");
|
||||
} else {
|
||||
builder.field(MESSAGE.getPreferredName(),
|
||||
"[" + numberShardsLeftToMerge + "] shards did not successfully force merge");
|
||||
}
|
||||
builder.field(SHARDS_TO_MERGE.getPreferredName(), numberShardsLeftToMerge);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
|
|
|
@ -138,7 +138,7 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
|
|||
assertEquals(new SegmentCountStep.Info(0L), conditionInfo.get());
|
||||
}
|
||||
|
||||
public void testIsConditionFails() {
|
||||
public void testIsConditionIsTrueEvenWhenMoreSegments() {
|
||||
int maxNumSegments = randomIntBetween(3, 10);
|
||||
Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
|
||||
Client client = Mockito.mock(Client.class);
|
||||
|
@ -191,8 +191,8 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
|
|||
}
|
||||
});
|
||||
|
||||
assertFalse(conditionMetResult.get());
|
||||
assertEquals(new SegmentCountStep.Info(1L), conditionInfo.get());
|
||||
assertTrue(conditionMetResult.get());
|
||||
assertEquals(new SegmentCountStep.Info(0L), conditionInfo.get());
|
||||
}
|
||||
|
||||
public void testThrowsException() {
|
||||
|
|
Loading…
Reference in New Issue