Backport 7.x - Add step to forcemerge enrich index after reindex (#41969)

Adds a step in the policy execution that forcemerge's a new enrich index after reindex completes.
This commit is contained in:
James Baiera 2019-05-22 16:21:45 -04:00
parent 9e514cb161
commit 824ccfabd9
2 changed files with 63 additions and 3 deletions

View File

@ -21,6 +21,8 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
@ -107,14 +109,14 @@ public class EnrichPolicyRunner implements Runnable {
logger.debug("Policy [{}]: Validating [{}] source mappings", policyName, sourceIndices);
for (String sourceIndex : sourceIndices) {
Map<String, Object> mapping = getMappings(getIndexResponse, sourceIndex);
Set<?> properties = ((Map<?, ?>) mapping.get("properties")).keySet();
Map<?, ?> properties = ((Map<?, ?>) mapping.get("properties"));
if (properties == null) {
listener.onFailure(
new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]",
policyName, sourceIndex, policy.getIndices()));
}
if (properties.contains(policy.getEnrichKey()) == false) {
if (properties.containsKey(policy.getEnrichKey()) == false) {
listener.onFailure(
new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not locate enrich key field [{}] on mapping for index [{}]",
@ -209,7 +211,7 @@ public class EnrichPolicyRunner implements Runnable {
} else {
logger.info("Policy [{}]: Transferred [{}] documents to enrich index [{}]", policyName,
bulkByScrollResponse.getCreated(), destinationIndexName);
refreshEnrichIndex(destinationIndexName);
forceMergeEnrichIndex(destinationIndexName);
}
}
@ -220,6 +222,22 @@ public class EnrichPolicyRunner implements Runnable {
});
}
private void forceMergeEnrichIndex(final String destinationIndexName) {
logger.debug("Policy [{}]: Force merging newly created enrich index [{}]", policyName, destinationIndexName);
client.admin().indices().forceMerge(new ForceMergeRequest(destinationIndexName).maxNumSegments(1),
new ActionListener<ForceMergeResponse>() {
@Override
public void onResponse(ForceMergeResponse forceMergeResponse) {
refreshEnrichIndex(destinationIndexName);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
private void refreshEnrichIndex(final String destinationIndexName) {
logger.debug("Policy [{}]: Refreshing newly created enrich index [{}]", policyName, destinationIndexName);
client.admin().indices().refresh(new RefreshRequest(destinationIndexName), new ActionListener<RefreshResponse>() {

View File

@ -16,6 +16,11 @@ import java.util.concurrent.atomic.AtomicReference;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
@ -25,6 +30,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.plugins.Plugin;
@ -118,6 +124,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
throw exception.get();
}
// Validate Index definition
String createdEnrichIndex = ".enrich-test1-" + createTime;
GetIndexResponse enrichIndex = client().admin().indices()
.getIndex(new GetIndexRequest().indices(".enrich-test1")).get();
@ -126,6 +133,8 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
Settings settings = enrichIndex.getSettings().get(createdEnrichIndex);
assertNotNull(settings);
assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all")));
// Validate Mapping
Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap();
assertThat(mapping.get("dynamic"), is("false"));
Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
@ -136,6 +145,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
assertThat(field1.get("type"), is(equalTo("keyword")));
assertThat(field1.get("doc_values"), is(false));
// Validate document structure
SearchResponse enrichSearchResponse = client().search(
new SearchRequest(".enrich-test1")
.source(SearchSourceBuilder.searchSource()
@ -148,6 +158,20 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
assertThat(enrichDocument.get("field1"), is(equalTo("value1")));
assertThat(enrichDocument.get("field2"), is(equalTo(2)));
assertThat(enrichDocument.get("field5"), is(equalTo("value5")));
// Validate segments
IndicesSegmentResponse indicesSegmentResponse = client().admin().indices()
.segments(new IndicesSegmentsRequest(createdEnrichIndex)).get();
IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(createdEnrichIndex);
assertNotNull(indexSegments);
assertThat(indexSegments.getShards().size(), is(equalTo(1)));
IndexShardSegments shardSegments = indexSegments.getShards().get(0);
assertNotNull(shardSegments);
assertThat(shardSegments.getShards().length, is(equalTo(1)));
ShardSegments shard = shardSegments.getShards()[0];
assertThat(shard.getSegments().size(), is(equalTo(1)));
Segment segment = shard.getSegments().iterator().next();
assertThat(segment.getNumDocs(), is(equalTo(1)));
}
public void testRunnerMultiSource() throws Exception {
@ -234,6 +258,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
throw exception.get();
}
// Validate Index definition
String createdEnrichIndex = ".enrich-test1-" + createTime;
GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet();
assertThat(enrichIndex.getIndices().length, equalTo(1));
@ -241,6 +266,8 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
Settings settings = enrichIndex.getSettings().get(createdEnrichIndex);
assertNotNull(settings);
assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all")));
// Validate Mapping
Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap();
assertThat(mapping.get("dynamic"), is("false"));
Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
@ -251,6 +278,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
assertThat(field1.get("type"), is(equalTo("keyword")));
assertThat(field1.get("doc_values"), is(false));
// Validate document structure
SearchResponse enrichSearchResponse = client().search(
new SearchRequest(".enrich-test1")
.source(SearchSourceBuilder.searchSource()
@ -263,5 +291,19 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
assertThat(enrichDocument.get("field1"), is(equalTo("value1")));
assertThat(enrichDocument.get("field2"), is(equalTo(2)));
assertThat(enrichDocument.get("field5"), is(equalTo("value5")));
// Validate segments
IndicesSegmentResponse indicesSegmentResponse = client().admin().indices()
.segments(new IndicesSegmentsRequest(createdEnrichIndex)).get();
IndexSegments indexSegments = indicesSegmentResponse.getIndices().get(createdEnrichIndex);
assertNotNull(indexSegments);
assertThat(indexSegments.getShards().size(), is(equalTo(1)));
IndexShardSegments shardSegments = indexSegments.getShards().get(0);
assertNotNull(shardSegments);
assertThat(shardSegments.getShards().length, is(equalTo(1)));
ShardSegments shard = shardSegments.getShards()[0];
assertThat(shard.getSegments().size(), is(equalTo(1)));
Segment segment = shard.getSegments().iterator().next();
assertThat(segment.getNumDocs(), is(equalTo(3)));
}
}