[ML] adding pivot.max_search_page_size option for setting paging size (#41920) (#42079)

* [ML] adding pivot.size option for setting paging size

* Changing field name to address PR comments

* fixing ctor usage

* adjust hlrc for field name change
This commit is contained in:
Benjamin Trent 2019-05-10 13:22:31 -05:00 committed by GitHub
parent 0931815355
commit febee07dcc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 154 additions and 32 deletions

View File

@ -39,25 +39,29 @@ public class PivotConfig implements ToXContentObject {
private static final ParseField GROUP_BY = new ParseField("group_by");
private static final ParseField AGGREGATIONS = new ParseField("aggregations");
private static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
private final GroupConfig groups;
private final AggregationConfig aggregationConfig;
private final Integer maxPageSearchSize;
private static final ConstructingObjectParser<PivotConfig, Void> PARSER = new ConstructingObjectParser<>("pivot_config", true,
args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1]));
args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1], (Integer) args[2]));
static {
PARSER.declareObject(constructorArg(), (p, c) -> (GroupConfig.fromXContent(p)), GROUP_BY);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p), AGGREGATIONS);
PARSER.declareInt(optionalConstructorArg(), MAX_PAGE_SEARCH_SIZE);
}
public static PivotConfig fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}
PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig) {
PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig, Integer maxPageSearchSize) {
this.groups = groups;
this.aggregationConfig = aggregationConfig;
this.maxPageSearchSize = maxPageSearchSize;
}
@Override
@ -65,6 +69,9 @@ public class PivotConfig implements ToXContentObject {
builder.startObject();
builder.field(GROUP_BY.getPreferredName(), groups);
builder.field(AGGREGATIONS.getPreferredName(), aggregationConfig);
if (maxPageSearchSize != null) {
builder.field(MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize);
}
builder.endObject();
return builder;
}
@ -77,6 +84,10 @@ public class PivotConfig implements ToXContentObject {
return groups;
}
public Integer getMaxPageSearchSize() {
return maxPageSearchSize;
}
@Override
public boolean equals(Object other) {
if (this == other) {
@ -89,12 +100,14 @@ public class PivotConfig implements ToXContentObject {
final PivotConfig that = (PivotConfig) other;
return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig);
return Objects.equals(this.groups, that.groups)
&& Objects.equals(this.aggregationConfig, that.aggregationConfig)
&& Objects.equals(this.maxPageSearchSize, that.maxPageSearchSize);
}
@Override
public int hashCode() {
return Objects.hash(groups, aggregationConfig);
return Objects.hash(groups, aggregationConfig, maxPageSearchSize);
}
public static Builder builder() {
@ -104,6 +117,7 @@ public class PivotConfig implements ToXContentObject {
public static class Builder {
private GroupConfig groups;
private AggregationConfig aggregationConfig;
private Integer maxPageSearchSize;
/**
* Set how to group the source data
@ -135,8 +149,22 @@ public class PivotConfig implements ToXContentObject {
return this;
}
/**
* Sets the paging maximum paging maxPageSearchSize that date frame transform can use when
* pulling the data from the source index.
*
* If OOM is triggered, the paging maxPageSearchSize is dynamically reduced so that the transform can continue to gather data.
*
* @param maxPageSearchSize Integer value between 10 and 10_000
* @return the {@link Builder} with the paging maxPageSearchSize set.
*/
public Builder setMaxPageSearchSize(Integer maxPageSearchSize) {
this.maxPageSearchSize = maxPageSearchSize;
return this;
}
public PivotConfig build() {
return new PivotConfig(groups, aggregationConfig);
return new PivotConfig(groups, aggregationConfig, maxPageSearchSize);
}
}
}

View File

@ -32,7 +32,9 @@ import java.util.function.Predicate;
public class PivotConfigTests extends AbstractXContentTestCase<PivotConfig> {
public static PivotConfig randomPivotConfig() {
return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig());
return new PivotConfig(GroupConfigTests.randomGroupConfig(),
AggregationConfigTests.randomAggregationConfig(),
randomBoolean() ? null : randomIntBetween(10, 10_000));
}
@Override

View File

@ -137,8 +137,9 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
// end::put-data-frame-transform-agg-config
// tag::put-data-frame-transform-pivot-config
PivotConfig pivotConfig = PivotConfig.builder()
.setGroups(groupConfig)
.setAggregationConfig(aggConfig)
.setGroups(groupConfig) // <1>
.setAggregationConfig(aggConfig) // <2>
.setMaxPageSearchSize(1000) // <3>
.build();
// end::put-data-frame-transform-pivot-config
// tag::put-data-frame-transform-config

View File

@ -66,6 +66,11 @@ Defines the pivot function `group by` fields and the aggregation to reduce the d
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-pivot-config]
--------------------------------------------------
<1> The `GroupConfig` to use in the pivot
<2> The aggregations to use
<3> The maximum paging size for the transform when pulling data
from the source. The size dynamically adjusts as the transform
is running to recover from and prevent OOM issues.
===== GroupConfig
The grouping terms. Defines the group by and destination fields

View File

@ -27,6 +27,7 @@ public final class DataFrameField {
public static final ParseField SOURCE = new ParseField("source");
public static final ParseField DESTINATION = new ParseField("dest");
public static final ParseField FORCE = new ParseField("force");
public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
/**
* Fields for checkpointing

View File

@ -56,6 +56,14 @@ public class PutDataFrameTransformAction extends Action<AcknowledgedResponse> {
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if(config.getPivotConfig() != null
&& config.getPivotConfig().getMaxPageSearchSize() != null
&& (config.getPivotConfig().getMaxPageSearchSize() < 10 || config.getPivotConfig().getMaxPageSearchSize() > 10_000)) {
validationException = addValidationError(
"pivot.max_page_search_size [" +
config.getPivotConfig().getMaxPageSearchSize() + "] must be greater than 10 and less than 10,000",
validationException);
}
for(String failure : config.getPivotConfig().aggFieldValidation()) {
validationException = addValidationError(failure, validationException);
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.dataframe.transforms.pivot;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -35,6 +36,7 @@ public class PivotConfig implements Writeable, ToXContentObject {
private static final String NAME = "data_frame_transform_pivot";
private final GroupConfig groups;
private final AggregationConfig aggregationConfig;
private final Integer maxPageSearchSize;
private static final ConstructingObjectParser<PivotConfig, Void> STRICT_PARSER = createParser(false);
private static final ConstructingObjectParser<PivotConfig, Void> LENIENT_PARSER = createParser(true);
@ -61,7 +63,7 @@ public class PivotConfig implements Writeable, ToXContentObject {
throw new IllegalArgumentException("Required [aggregations]");
}
return new PivotConfig(groups, aggregationConfig);
return new PivotConfig(groups, aggregationConfig, (Integer)args[3]);
});
parser.declareObject(constructorArg(),
@ -69,18 +71,21 @@ public class PivotConfig implements Writeable, ToXContentObject {
parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGREGATIONS);
parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGS);
parser.declareInt(optionalConstructorArg(), DataFrameField.MAX_PAGE_SEARCH_SIZE);
return parser;
}
public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig) {
public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig, Integer maxPageSearchSize) {
this.groups = ExceptionsHelper.requireNonNull(groups, DataFrameField.GROUP_BY.getPreferredName());
this.aggregationConfig = ExceptionsHelper.requireNonNull(aggregationConfig, DataFrameField.AGGREGATIONS.getPreferredName());
this.maxPageSearchSize = maxPageSearchSize;
}
public PivotConfig(StreamInput in) throws IOException {
this.groups = new GroupConfig(in);
this.aggregationConfig = new AggregationConfig(in);
this.maxPageSearchSize = in.readOptionalInt();
}
@Override
@ -88,6 +93,9 @@ public class PivotConfig implements Writeable, ToXContentObject {
builder.startObject();
builder.field(DataFrameField.GROUP_BY.getPreferredName(), groups);
builder.field(DataFrameField.AGGREGATIONS.getPreferredName(), aggregationConfig);
if (maxPageSearchSize != null) {
builder.field(DataFrameField.MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize);
}
builder.endObject();
return builder;
}
@ -113,6 +121,7 @@ public class PivotConfig implements Writeable, ToXContentObject {
public void writeTo(StreamOutput out) throws IOException {
groups.writeTo(out);
aggregationConfig.writeTo(out);
out.writeOptionalInt(maxPageSearchSize);
}
public AggregationConfig getAggregationConfig() {
@ -123,6 +132,11 @@ public class PivotConfig implements Writeable, ToXContentObject {
return groups;
}
@Nullable
public Integer getMaxPageSearchSize() {
return maxPageSearchSize;
}
@Override
public boolean equals(Object other) {
if (this == other) {
@ -135,12 +149,14 @@ public class PivotConfig implements Writeable, ToXContentObject {
final PivotConfig that = (PivotConfig) other;
return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig);
return Objects.equals(this.groups, that.groups)
&& Objects.equals(this.aggregationConfig, that.aggregationConfig)
&& Objects.equals(this.maxPageSearchSize, that.maxPageSearchSize);
}
@Override
public int hashCode() {
return Objects.hash(groups, aggregationConfig);
return Objects.hash(groups, aggregationConfig, maxPageSearchSize);
}
public boolean isValid() {

View File

@ -24,11 +24,15 @@ import static org.hamcrest.Matchers.empty;
public class PivotConfigTests extends AbstractSerializingDataFrameTestCase<PivotConfig> {
public static PivotConfig randomPivotConfig() {
return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig());
return new PivotConfig(GroupConfigTests.randomGroupConfig(),
AggregationConfigTests.randomAggregationConfig(),
randomBoolean() ? null : randomIntBetween(10, 10_000));
}
public static PivotConfig randomInvalidPivotConfig() {
return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomInvalidAggregationConfig());
return new PivotConfig(GroupConfigTests.randomGroupConfig(),
AggregationConfigTests.randomInvalidAggregationConfig(),
randomBoolean() ? null : randomIntBetween(10, 10_000));
}
@Override

View File

@ -172,7 +172,13 @@ abstract class DataFrameIntegTestCase extends ESIntegTestCase {
protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
AggregatorFactories.Builder aggregations) throws Exception {
return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations));
return createPivotConfig(groups, aggregations, null);
}
protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
AggregatorFactories.Builder aggregations,
Integer size) throws Exception {
return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations), size);
}
protected DataFrameTransformConfig createTransformConfig(String id,

View File

@ -130,7 +130,7 @@ public class DataFrameTransformProgressIT extends ESIntegTestCase {
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
aggs.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs);
PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig);
PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null);
DataFrameTransformConfig config = new DataFrameTransformConfig("get_progress_transform",
sourceConfig,
destConfig,
@ -149,7 +149,7 @@ public class DataFrameTransformProgressIT extends ESIntegTestCase {
QueryConfig queryConfig = new QueryConfig(Collections.emptyMap(), QueryBuilders.termQuery("user_id", "user_26"));
pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig);
pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null);
sourceConfig = new SourceConfig(new String[]{REVIEWS_INDEX_NAME}, queryConfig);
config = new DataFrameTransformConfig("get_progress_transform",
sourceConfig,

View File

@ -76,13 +76,15 @@ public class Pivot {
* the page size, the type of aggregations and the data. As the page size is the number of buckets we return
* per page the page size is a multiplier for the costs of aggregating bucket.
*
* Initially this returns a default, in future it might inspect the configuration and base the initial size
* on the aggregations used.
* The user may set a maximum in the {@link PivotConfig#getMaxPageSearchSize()}, but if that is not provided,
* the default {@link Pivot#DEFAULT_INITIAL_PAGE_SIZE} is used.
*
* In future we might inspect the configuration and base the initial size on the aggregations used.
*
* @return the page size
*/
public int getInitialPageSize() {
return DEFAULT_INITIAL_PAGE_SIZE;
return config.getMaxPageSearchSize() == null ? DEFAULT_INITIAL_PAGE_SIZE : config.getMaxPageSearchSize();
}
public SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map<String, Object> position, int pageSize) {

View File

@ -23,7 +23,9 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
@ -39,7 +41,10 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import static org.elasticsearch.xpack.core.dataframe.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.xpack.core.dataframe.transforms.SourceConfigTests.randomSourceConfig;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -169,9 +174,15 @@ public class DataFrameIndexerTests extends ESTestCase {
}
public void testPageSizeAdapt() throws InterruptedException {
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfig();
Integer pageSize = randomBoolean() ? null : randomIntBetween(500, 10_000);
DataFrameTransformConfig config = new DataFrameTransformConfig(randomAlphaOfLength(10),
randomSourceConfig(),
randomDestConfig(),
null,
new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final long initialPageSize = pageSize == null ? Pivot.DEFAULT_INITIAL_PAGE_SIZE : pageSize;
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
throw new SearchPhaseExecutionException("query", "Partial shards failure", new ShardSearchFailure[] {
new ShardSearchFailure(new CircuitBreakingException("to much memory", 110, 100, Durability.TRANSIENT)) });
@ -179,9 +190,7 @@ public class DataFrameIndexerTests extends ESTestCase {
Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
Consumer<Exception> failureConsumer = e -> {
fail("expected circuit breaker exception to be handled");
};
Consumer<Exception> failureConsumer = e -> fail("expected circuit breaker exception to be handled");
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
@ -197,8 +206,8 @@ public class DataFrameIndexerTests extends ESTestCase {
latch.countDown();
awaitBusy(() -> indexer.getState() == IndexerState.STOPPED);
long pageSizeAfterFirstReduction = indexer.getPageSize();
assertTrue(Pivot.DEFAULT_INITIAL_PAGE_SIZE > pageSizeAfterFirstReduction);
assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE);
assertThat(initialPageSize, greaterThan(pageSizeAfterFirstReduction));
assertThat(pageSizeAfterFirstReduction, greaterThan((long)DataFrameIndexer.MINIMUM_PAGE_SIZE));
// run indexer a 2nd time
final CountDownLatch secondRunLatch = indexer.newLatch(1);
@ -211,8 +220,8 @@ public class DataFrameIndexerTests extends ESTestCase {
awaitBusy(() -> indexer.getState() == IndexerState.STOPPED);
// assert that page size has been reduced again
assertTrue(pageSizeAfterFirstReduction > indexer.getPageSize());
assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE);
assertThat(pageSizeAfterFirstReduction, greaterThan((long)indexer.getPageSize()));
assertThat(pageSizeAfterFirstReduction, greaterThan((long)DataFrameIndexer.MINIMUM_PAGE_SIZE));
} finally {
executor.shutdownNow();

View File

@ -97,6 +97,16 @@ public class PivotTests extends ESTestCase {
assertInvalidTransform(client, source, pivot);
}
public void testInitialPageSize() throws Exception {
int expectedPageSize = 1000;
Pivot pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), expectedPageSize));
assertThat(pivot.getInitialPageSize(), equalTo(expectedPageSize));
pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null));
assertThat(pivot.getInitialPageSize(), equalTo(Pivot.DEFAULT_INITIAL_PAGE_SIZE));
}
public void testSearchFailure() throws Exception {
// test a failure during the search operation, transform creation fails if
// search has failures although they might just be temporary
@ -167,11 +177,11 @@ public class PivotTests extends ESTestCase {
}
private PivotConfig getValidPivotConfig() throws IOException {
return new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig());
return new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null);
}
private PivotConfig getValidPivotConfig(AggregationConfig aggregationConfig) throws IOException {
return new PivotConfig(GroupConfigTests.randomGroupConfig(), aggregationConfig);
return new PivotConfig(GroupConfigTests.randomGroupConfig(), aggregationConfig, null);
}
private AggregationConfig getValidAggregationConfig() throws IOException {

View File

@ -303,6 +303,36 @@ setup:
}
}
---
"Test put config with invalid pivot size":
- do:
catch: /pivot\.max_page_search_size \[5\] must be greater than 10 and less than 10,000/
data_frame.put_data_frame_transform:
transform_id: "airline-transform"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-dest-index" },
"pivot": {
"max_page_search_size": 5,
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
catch: /pivot\.max_page_search_size \[15000\] must be greater than 10 and less than 10,000/
data_frame.put_data_frame_transform:
transform_id: "airline-transform"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-dest-index" },
"pivot": {
"max_page_search_size": 15000,
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
---
"Test creation failures due to duplicate and conflicting field names":
- do:
catch: /duplicate field \[airline\] detected/