Add retry to force merge operation in EnrichPolicyRunner (#47178)
Adds a check when running an Enrich policy to make sure that an Enrich index is force merged down to one segment, and if it was not fully merged, attempts the merge again, up to a configurable number of times.
This commit is contained in:
parent
8b7100eb1f
commit
b9fb354618
|
@ -83,6 +83,9 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
|
|||
public static final Setting<Integer> COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST =
|
||||
Setting.intSetting("enrich.coordinator_proxy.max_lookups_per_request", 128, 1, 10000, Setting.Property.NodeScope);
|
||||
|
||||
static final Setting<Integer> ENRICH_MAX_FORCE_MERGE_ATTEMPTS =
|
||||
Setting.intSetting("enrich.max_force_merge_attempts", 3, 1, 10, Setting.Property.NodeScope);
|
||||
|
||||
private static final String QUEUE_CAPACITY_SETTING_NAME = "enrich.coordinator_proxy.queue_capacity";
|
||||
public static final Setting<Integer> COORDINATOR_PROXY_QUEUE_CAPACITY = new Setting<>(QUEUE_CAPACITY_SETTING_NAME,
|
||||
settings -> {
|
||||
|
@ -196,7 +199,8 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
|
|||
ENRICH_CLEANUP_PERIOD,
|
||||
COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS,
|
||||
COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST,
|
||||
COORDINATOR_PROXY_QUEUE_CAPACITY
|
||||
COORDINATOR_PROXY_QUEUE_CAPACITY,
|
||||
ENRICH_MAX_FORCE_MERGE_ATTEMPTS
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ public class EnrichPolicyExecutor {
|
|||
private final int fetchSize;
|
||||
private final EnrichPolicyLocks policyLocks;
|
||||
private final int maximumConcurrentPolicyExecutions;
|
||||
private final int maxForceMergeAttempts;
|
||||
private final Semaphore policyExecutionPermits;
|
||||
|
||||
EnrichPolicyExecutor(Settings settings,
|
||||
|
@ -45,6 +46,7 @@ public class EnrichPolicyExecutor {
|
|||
this.policyLocks = policyLocks;
|
||||
this.fetchSize = EnrichPlugin.ENRICH_FETCH_SIZE_SETTING.get(settings);
|
||||
this.maximumConcurrentPolicyExecutions = EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.get(settings);
|
||||
this.maxForceMergeAttempts = EnrichPlugin.ENRICH_MAX_FORCE_MERGE_ATTEMPTS.get(settings);
|
||||
this.policyExecutionPermits = new Semaphore(maximumConcurrentPolicyExecutions);
|
||||
}
|
||||
|
||||
|
@ -90,7 +92,7 @@ public class EnrichPolicyExecutor {
|
|||
|
||||
protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
|
||||
return new EnrichPolicyRunner(policyName, policy, listener, clusterService, client, indexNameExpressionResolver, nowSupplier,
|
||||
fetchSize);
|
||||
fetchSize, maxForceMergeAttempts);
|
||||
}
|
||||
|
||||
public void runPolicy(String policyId, ActionListener<PolicyExecutionResult> listener) {
|
||||
|
|
|
@ -21,6 +21,11 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
|||
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
||||
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
|
||||
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
|
||||
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
|
||||
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
|
||||
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -71,10 +76,11 @@ public class EnrichPolicyRunner implements Runnable {
|
|||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
private final LongSupplier nowSupplier;
|
||||
private final int fetchSize;
|
||||
private final int maxForceMergeAttempts;
|
||||
|
||||
EnrichPolicyRunner(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener,
|
||||
ClusterService clusterService, Client client, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
LongSupplier nowSupplier, int fetchSize) {
|
||||
LongSupplier nowSupplier, int fetchSize, int maxForceMergeAttempts) {
|
||||
this.policyName = policyName;
|
||||
this.policy = policy;
|
||||
this.listener = listener;
|
||||
|
@ -83,6 +89,7 @@ public class EnrichPolicyRunner implements Runnable {
|
|||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
this.nowSupplier = nowSupplier;
|
||||
this.fetchSize = fetchSize;
|
||||
this.maxForceMergeAttempts = maxForceMergeAttempts;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -239,6 +246,7 @@ public class EnrichPolicyRunner implements Runnable {
|
|||
long nowTimestamp = nowSupplier.getAsLong();
|
||||
String enrichIndexName = EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp;
|
||||
Settings enrichIndexSettings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
// No changes will be made to an enrich index after policy execution, so need to enable automatic refresh interval:
|
||||
.put("index.refresh_interval", -1)
|
||||
|
@ -310,7 +318,7 @@ public class EnrichPolicyRunner implements Runnable {
|
|||
} else {
|
||||
logger.info("Policy [{}]: Transferred [{}] documents to enrich index [{}]", policyName,
|
||||
bulkByScrollResponse.getCreated(), destinationIndexName);
|
||||
forceMergeEnrichIndex(destinationIndexName);
|
||||
forceMergeEnrichIndex(destinationIndexName, 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -321,13 +329,14 @@ public class EnrichPolicyRunner implements Runnable {
|
|||
});
|
||||
}
|
||||
|
||||
private void forceMergeEnrichIndex(final String destinationIndexName) {
|
||||
logger.debug("Policy [{}]: Force merging newly created enrich index [{}]", policyName, destinationIndexName);
|
||||
private void forceMergeEnrichIndex(final String destinationIndexName, final int attempt) {
|
||||
logger.debug("Policy [{}]: Force merging newly created enrich index [{}] (Attempt {}/{})", policyName, destinationIndexName,
|
||||
attempt, maxForceMergeAttempts);
|
||||
client.admin().indices().forceMerge(new ForceMergeRequest(destinationIndexName).maxNumSegments(1),
|
||||
new ActionListener<ForceMergeResponse>() {
|
||||
@Override
|
||||
public void onResponse(ForceMergeResponse forceMergeResponse) {
|
||||
refreshEnrichIndex(destinationIndexName);
|
||||
refreshEnrichIndex(destinationIndexName, attempt);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -337,12 +346,50 @@ public class EnrichPolicyRunner implements Runnable {
|
|||
});
|
||||
}
|
||||
|
||||
private void refreshEnrichIndex(final String destinationIndexName) {
|
||||
logger.debug("Policy [{}]: Refreshing newly created enrich index [{}]", policyName, destinationIndexName);
|
||||
private void refreshEnrichIndex(final String destinationIndexName, final int attempt) {
|
||||
logger.debug("Policy [{}]: Refreshing enrich index [{}]", policyName, destinationIndexName);
|
||||
client.admin().indices().refresh(new RefreshRequest(destinationIndexName), new ActionListener<RefreshResponse>() {
|
||||
@Override
|
||||
public void onResponse(RefreshResponse refreshResponse) {
|
||||
setIndexReadOnly(destinationIndexName);
|
||||
ensureSingleSegment(destinationIndexName, attempt);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected void ensureSingleSegment(final String destinationIndexName, final int attempt) {
|
||||
client.admin().indices().segments(new IndicesSegmentsRequest(destinationIndexName), new ActionListener<IndicesSegmentResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndicesSegmentResponse indicesSegmentResponse) {
|
||||
IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(destinationIndexName);
|
||||
if (indexSegments == null) {
|
||||
throw new ElasticsearchException("Could not locate segment information for newly created index [{}]",
|
||||
destinationIndexName);
|
||||
}
|
||||
Map<Integer, IndexShardSegments> indexShards = indexSegments.getShards();
|
||||
assert indexShards.size() == 1 : "Expected enrich index to contain only one shard";
|
||||
ShardSegments[] shardSegments = indexShards.get(0).getShards();
|
||||
assert shardSegments.length == 1 : "Expected enrich index to contain no replicas at this point";
|
||||
ShardSegments primarySegments = shardSegments[0];
|
||||
if (primarySegments.getSegments().size() > 1) {
|
||||
int nextAttempt = attempt + 1;
|
||||
if (nextAttempt > maxForceMergeAttempts) {
|
||||
listener.onFailure(new ElasticsearchException(
|
||||
"Force merging index [{}] attempted [{}] times but did not result in one segment.",
|
||||
destinationIndexName, attempt, maxForceMergeAttempts));
|
||||
} else {
|
||||
logger.debug("Policy [{}]: Force merge result contains more than one segment [{}], retrying (attempt {}/{})",
|
||||
policyName, primarySegments.getSegments().size(), nextAttempt, maxForceMergeAttempts);
|
||||
forceMergeEnrichIndex(destinationIndexName, nextAttempt);
|
||||
}
|
||||
} else {
|
||||
// Force merge down to one segment successful
|
||||
setIndexReadOnly(destinationIndexName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -12,6 +12,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
|
@ -37,6 +38,7 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.smile.SmileXContent;
|
||||
import org.elasticsearch.index.engine.Segment;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
|
@ -1226,12 +1228,135 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
|
|||
ensureEnrichIndexIsReadOnly(createdEnrichIndex);
|
||||
}
|
||||
|
||||
public void testRunnerWithForceMergeRetry() throws Exception {
|
||||
final String sourceIndex = "source-index";
|
||||
IndexResponse indexRequest = client().index(new IndexRequest()
|
||||
.index(sourceIndex)
|
||||
.id("id")
|
||||
.source(
|
||||
"{" +
|
||||
"\"field1\":\"value1\"," +
|
||||
"\"field2\":2," +
|
||||
"\"field3\":\"ignored\"," +
|
||||
"\"field4\":\"ignored\"," +
|
||||
"\"field5\":\"value5\"" +
|
||||
"}",
|
||||
XContentType.JSON)
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
||||
).actionGet();
|
||||
assertEquals(RestStatus.CREATED, indexRequest.status());
|
||||
|
||||
SearchResponse sourceSearchResponse = client().search(
|
||||
new SearchRequest(sourceIndex)
|
||||
.source(SearchSourceBuilder.searchSource()
|
||||
.query(QueryBuilders.matchAllQuery()))).actionGet();
|
||||
assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L));
|
||||
Map<String, Object> sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap();
|
||||
assertNotNull(sourceDocMap);
|
||||
assertThat(sourceDocMap.get("field1"), is(equalTo("value1")));
|
||||
assertThat(sourceDocMap.get("field2"), is(equalTo(2)));
|
||||
assertThat(sourceDocMap.get("field3"), is(equalTo("ignored")));
|
||||
assertThat(sourceDocMap.get("field4"), is(equalTo("ignored")));
|
||||
assertThat(sourceDocMap.get("field5"), is(equalTo("value5")));
|
||||
|
||||
List<String> enrichFields = Arrays.asList("field2", "field5");
|
||||
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList(sourceIndex), "field1",
|
||||
enrichFields);
|
||||
String policyName = "test1";
|
||||
|
||||
final long createTime = randomNonNegativeLong();
|
||||
String createdEnrichIndex = ".enrich-test1-" + createTime;
|
||||
final AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
ActionListener<PolicyExecutionResult> listener = createTestListener(latch, exception::set);
|
||||
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
|
||||
IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class);
|
||||
AtomicInteger forceMergeAttempts = new AtomicInteger(0);
|
||||
final XContentBuilder unmergedDocument = SmileXContent.contentBuilder()
|
||||
.startObject().field("field1", "value1.1").field("field2", 2).field("field5", "value5").endObject();
|
||||
EnrichPolicyRunner enrichPolicyRunner = new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver,
|
||||
() -> createTime, randomIntBetween(1, 10000), randomIntBetween(3, 10)) {
|
||||
@Override
|
||||
protected void ensureSingleSegment(String destinationIndexName, int attempt) {
|
||||
forceMergeAttempts.incrementAndGet();
|
||||
if (attempt == 1) {
|
||||
// Put and flush a document to increase the number of segments, simulating not
|
||||
// all segments were merged on the first try.
|
||||
IndexResponse indexRequest = client().index(new IndexRequest()
|
||||
.index(createdEnrichIndex)
|
||||
.source(unmergedDocument)
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
||||
).actionGet();
|
||||
assertEquals(RestStatus.CREATED, indexRequest.status());
|
||||
}
|
||||
super.ensureSingleSegment(destinationIndexName, attempt);
|
||||
}
|
||||
};
|
||||
|
||||
logger.info("Starting policy run");
|
||||
enrichPolicyRunner.run();
|
||||
latch.await();
|
||||
if (exception.get() != null) {
|
||||
throw exception.get();
|
||||
}
|
||||
|
||||
// Validate number of force merges
|
||||
assertThat(forceMergeAttempts.get(), equalTo(2));
|
||||
|
||||
// Validate Index definition
|
||||
GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet();
|
||||
assertThat(enrichIndex.getIndices().length, equalTo(1));
|
||||
assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex));
|
||||
Settings settings = enrichIndex.getSettings().get(createdEnrichIndex);
|
||||
assertNotNull(settings);
|
||||
assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all")));
|
||||
|
||||
// Validate Mapping
|
||||
Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap();
|
||||
validateMappingMetadata(mapping, policyName, policy);
|
||||
assertThat(mapping.get("dynamic"), is("false"));
|
||||
Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
|
||||
assertNotNull(properties);
|
||||
assertThat(properties.size(), is(equalTo(1)));
|
||||
Map<?, ?> field1 = (Map<?, ?>) properties.get("field1");
|
||||
assertNotNull(field1);
|
||||
assertThat(field1.get("type"), is(equalTo("keyword")));
|
||||
assertThat(field1.get("doc_values"), is(false));
|
||||
|
||||
// Validate document structure
|
||||
SearchResponse allEnrichDocs = client().search(
|
||||
new SearchRequest(".enrich-test1")
|
||||
.source(SearchSourceBuilder.searchSource()
|
||||
.query(QueryBuilders.matchAllQuery()))).actionGet();
|
||||
assertThat(allEnrichDocs.getHits().getTotalHits().value, equalTo(2L));
|
||||
for (String keyValue : Arrays.asList("value1", "value1.1")) {
|
||||
SearchResponse enrichSearchResponse = client().search(
|
||||
new SearchRequest(".enrich-test1")
|
||||
.source(SearchSourceBuilder.searchSource()
|
||||
.query(QueryBuilders.matchQuery("field1", keyValue)))).actionGet();
|
||||
|
||||
assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L));
|
||||
Map<String, Object> enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap();
|
||||
assertNotNull(enrichDocument);
|
||||
assertThat(enrichDocument.size(), is(equalTo(3)));
|
||||
assertThat(enrichDocument.get("field1"), is(equalTo(keyValue)));
|
||||
assertThat(enrichDocument.get("field2"), is(equalTo(2)));
|
||||
assertThat(enrichDocument.get("field5"), is(equalTo("value5")));
|
||||
}
|
||||
|
||||
// Validate segments
|
||||
validateSegments(createdEnrichIndex, 2);
|
||||
|
||||
// Validate Index is read only
|
||||
ensureEnrichIndexIsReadOnly(createdEnrichIndex);
|
||||
}
|
||||
|
||||
private EnrichPolicyRunner createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener,
|
||||
Long createTime) {
|
||||
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
|
||||
IndexNameExpressionResolver resolver = getInstanceFromNode(IndexNameExpressionResolver.class);
|
||||
return new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver, () -> createTime,
|
||||
randomIntBetween(1, 10000));
|
||||
randomIntBetween(1, 10000), randomIntBetween(1, 10));
|
||||
}
|
||||
|
||||
private ActionListener<PolicyExecutionResult> createTestListener(final CountDownLatch latch,
|
||||
|
|
Loading…
Reference in New Issue