From befe2a45b98cf7845789b8f666a33c0c1a6fff00 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 3 May 2019 13:36:59 +0200 Subject: [PATCH] [ML-DataFrame] refactor pivot to only take the pivot config (#41763) refactor pivot class to only take the config at construction, other parameters are passed in as part of method that require them --- .../dataframe/transforms/QueryConfig.java | 2 +- ...nsportPreviewDataFrameTransformAction.java | 13 +++-- .../TransportPutDataFrameTransformAction.java | 6 +-- ...ransportStartDataFrameTransformAction.java | 6 +-- .../transforms/DataFrameIndexer.java | 6 +-- .../dataframe/transforms/pivot/Pivot.java | 54 +++++++++---------- .../transforms/pivot/PivotTests.java | 43 ++++++++------- 7 files changed, 63 insertions(+), 67 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/QueryConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/QueryConfig.java index 670b1009d29..981a56e639d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/QueryConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/QueryConfig.java @@ -39,7 +39,7 @@ public class QueryConfig extends AbstractDiffable implements Writea private final Map source; private final QueryBuilder query; - static QueryConfig matchAll() { + public static QueryConfig matchAll() { return new QueryConfig(Collections.singletonMap(MatchAllQueryBuilder.NAME, Collections.emptyMap()), new MatchAllQueryBuilder()); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java index 5b361273050..36943f39f8e 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import java.util.List; @@ -61,24 +62,22 @@ public class TransportPreviewDataFrameTransformAction extends final DataFrameTransformConfig config = request.getConfig(); - Pivot pivot = new Pivot(config.getSource().getIndex(), - config.getSource().getQueryConfig().getQuery(), - config.getPivotConfig()); + Pivot pivot = new Pivot(config.getPivotConfig()); - getPreview(pivot, ActionListener.wrap( + getPreview(pivot, config.getSource(), ActionListener.wrap( previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)), listener::onFailure )); } - private void getPreview(Pivot pivot, ActionListener>> listener) { - pivot.deduceMappings(client, ActionListener.wrap( + private void getPreview(Pivot pivot, SourceConfig source, ActionListener>> listener) { + pivot.deduceMappings(client, source, ActionListener.wrap( deducedMappings -> { ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, SearchAction.INSTANCE, - pivot.buildSearchRequest(null, NUMBER_OF_PREVIEW_BUCKETS), + pivot.buildSearchRequest(source, null, NUMBER_OF_PREVIEW_BUCKETS), ActionListener.wrap( r -> { final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index edcd0689290..e761c33d569 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -190,9 +190,7 @@ public class TransportPutDataFrameTransformAction private void putDataFrame(DataFrameTransformConfig config, ActionListener listener) { - final Pivot pivot = new Pivot(config.getSource().getIndex(), - config.getSource().getQueryConfig().getQuery(), - config.getPivotConfig()); + final Pivot pivot = new Pivot(config.getPivotConfig()); // <5> Return the listener, or clean up destination index on failure. @@ -210,6 +208,6 @@ public class TransportPutDataFrameTransformAction ); // <1> Validate our pivot - pivot.validate(client, pivotValidationListener); + pivot.validate(client, config.getSource(), pivotValidationListener); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index 98e70fa2578..39c0c74bbd5 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -224,9 +224,7 @@ public class TransportStartDataFrameTransformAction extends private void createDestinationIndex(final DataFrameTransformConfig config, final ActionListener listener) { - final Pivot pivot = new Pivot(config.getSource().getIndex(), - config.getSource().getQueryConfig().getQuery(), - config.getPivotConfig()); + final Pivot pivot = new Pivot(config.getPivotConfig()); ActionListener> deduceMappingsListener = ActionListener.wrap( mappings -> DataframeIndex.createDestinationIndex(client, @@ -238,7 +236,7 @@ public class TransportStartDataFrameTransformAction extends deduceTargetMappingsException)) ); - pivot.deduceMappings(client, deduceMappingsListener); + pivot.deduceMappings(client, config.getSource(), deduceMappingsListener); } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index 8fd170520d3..142388cb10f 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; @@ -97,8 +96,7 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer listener) { try { - QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery(); - pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig()); + pivot = new Pivot(getConfig().getPivotConfig()); // if we haven't set the page size yet, if it is set we might have reduced it after running into an out of memory if (pageSize == 0) { @@ -180,7 +178,7 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer listener) { + public void validate(Client client, SourceConfig sourceConfig, final ActionListener listener) { // step 1: check if used aggregations are supported for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) { if (Aggregations.isSupportedByDataframe(agg.getType()) == false) { @@ -64,11 +62,11 @@ public class Pivot { } // step 2: run a query to validate that config is valid - runTestQuery(client, listener); + runTestQuery(client, sourceConfig, listener); } - public void deduceMappings(Client client, final ActionListener> listener) { - SchemaUtil.deduceMappings(client, config, source, listener); + public void deduceMappings(Client client, SourceConfig sourceConfig, final ActionListener> listener) { + SchemaUtil.deduceMappings(client, config, sourceConfig.getIndex(), listener); } /** @@ -87,14 +85,24 @@ public class Pivot { return DEFAULT_INITIAL_PAGE_SIZE; } - public SearchRequest buildSearchRequest(Map position, int pageSize) { - if (position != null) { - cachedCompositeAggregation.aggregateAfter(position); - } + public SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map position, int pageSize) { + QueryBuilder queryBuilder = sourceConfig.getQueryConfig().getQuery(); + SearchRequest searchRequest = new SearchRequest(sourceConfig.getIndex()); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.aggregation(buildAggregation(position, pageSize)); + sourceBuilder.size(0); + sourceBuilder.query(queryBuilder); + searchRequest.source(sourceBuilder); + return searchRequest; + + } + + public AggregationBuilder buildAggregation(Map position, int pageSize) { + cachedCompositeAggregation.aggregateAfter(position); cachedCompositeAggregation.size(pageSize); - return cachedSearchRequest; + return cachedCompositeAggregation; } public Stream> extractResults(CompositeAggregation agg, @@ -113,10 +121,10 @@ public class Pivot { dataFrameIndexerTransformStats); } - private void runTestQuery(Client client, final ActionListener listener) { - // no after key - cachedCompositeAggregation.aggregateAfter(null); - client.execute(SearchAction.INSTANCE, cachedSearchRequest, ActionListener.wrap(response -> { + private void runTestQuery(Client client, SourceConfig sourceConfig, final ActionListener listener) { + SearchRequest searchRequest = buildSearchRequest(sourceConfig, null, TEST_QUERY_PAGE_SIZE); + + client.execute(SearchAction.INSTANCE, searchRequest, ActionListener.wrap(response -> { if (response == null) { listener.onFailure(new RuntimeException("Unexpected null response from test query")); return; @@ -131,16 +139,6 @@ public class Pivot { })); } - private static SearchRequest createSearchRequest(String[] index, QueryBuilder query, CompositeAggregationBuilder compositeAggregation) { - SearchRequest searchRequest = new SearchRequest(index); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); - sourceBuilder.aggregation(compositeAggregation); - sourceBuilder.size(0); - sourceBuilder.query(query); - searchRequest.source(sourceBuilder); - return searchRequest; - } - private static CompositeAggregationBuilder createCompositeAggregation(PivotConfig config) { CompositeAggregationBuilder compositeAggregation; diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java index d4607d7adc3..172868833f3 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java @@ -22,12 +22,13 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchModule; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.xpack.core.dataframe.transforms.QueryConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig; import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfig; import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfigTests; import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig; @@ -83,42 +84,46 @@ public class PivotTests extends ESTestCase { } public void testValidateExistingIndex() throws Exception { - Pivot pivot = new Pivot(new String[]{"existing_source_index"}, new MatchAllQueryBuilder(), getValidPivotConfig()); + SourceConfig source = new SourceConfig(new String[]{"existing_source_index"}, QueryConfig.matchAll()); + Pivot pivot = new Pivot(getValidPivotConfig()); - assertValidTransform(client, pivot); + assertValidTransform(client, source, pivot); } public void testValidateNonExistingIndex() throws Exception { - Pivot pivot = new Pivot(new String[]{"non_existing_source_index"}, new MatchAllQueryBuilder(), getValidPivotConfig()); + SourceConfig source = new SourceConfig(new String[]{"non_existing_source_index"}, QueryConfig.matchAll()); + Pivot pivot = new Pivot(getValidPivotConfig()); - assertInvalidTransform(client, pivot); + assertInvalidTransform(client, source, pivot); } public void testSearchFailure() throws Exception { // test a failure during the search operation, transform creation fails if // search has failures although they might just be temporary - Pivot pivot = new Pivot(new String[]{"existing_source_index_with_failing_shards"}, - new MatchAllQueryBuilder(), - getValidPivotConfig()); + SourceConfig source = new SourceConfig(new String[] { "existing_source_index_with_failing_shards" }, QueryConfig.matchAll()); - assertInvalidTransform(client, pivot); + Pivot pivot = new Pivot(getValidPivotConfig()); + + assertInvalidTransform(client, source, pivot); } public void testValidateAllSupportedAggregations() throws Exception { for (String agg : supportedAggregations) { AggregationConfig aggregationConfig = getAggregationConfig(agg); + SourceConfig source = new SourceConfig(new String[]{"existing_source"}, QueryConfig.matchAll()); - Pivot pivot = new Pivot(new String[]{"existing_source"}, new MatchAllQueryBuilder(), getValidPivotConfig(aggregationConfig)); - assertValidTransform(client, pivot); + Pivot pivot = new Pivot(getValidPivotConfig(aggregationConfig)); + assertValidTransform(client, source, pivot); } } public void testValidateAllUnsupportedAggregations() throws Exception { for (String agg : unsupportedAggregations) { AggregationConfig aggregationConfig = getAggregationConfig(agg); + SourceConfig source = new SourceConfig(new String[]{"existing_source"}, QueryConfig.matchAll()); - Pivot pivot = new Pivot(new String[]{"existing_source"}, new MatchAllQueryBuilder(), getValidPivotConfig(aggregationConfig)); - assertInvalidTransform(client, pivot); + Pivot pivot = new Pivot(getValidPivotConfig(aggregationConfig)); + assertInvalidTransform(client, source, pivot); } } @@ -202,18 +207,18 @@ public class PivotTests extends ESTestCase { return AggregationConfig.fromXContent(parser, false); } - private static void assertValidTransform(Client client, Pivot pivot) throws Exception { - validate(client, pivot, true); + private static void assertValidTransform(Client client, SourceConfig source, Pivot pivot) throws Exception { + validate(client, source, pivot, true); } - private static void assertInvalidTransform(Client client, Pivot pivot) throws Exception { - validate(client, pivot, false); + private static void assertInvalidTransform(Client client, SourceConfig source, Pivot pivot) throws Exception { + validate(client, source, pivot, false); } - private static void validate(Client client, Pivot pivot, boolean expectValid) throws Exception { + private static void validate(Client client, SourceConfig source, Pivot pivot, boolean expectValid) throws Exception { CountDownLatch latch = new CountDownLatch(1); final AtomicReference exceptionHolder = new AtomicReference<>(); - pivot.validate(client, ActionListener.wrap(validity -> { + pivot.validate(client, source, ActionListener.wrap(validity -> { assertEquals(expectValid, validity); latch.countDown(); }, e -> {