From 4eea602d2d334c3416b0cea5bf1e6051b64d647a Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Tue, 15 Sep 2020 13:52:38 +0200 Subject: [PATCH] Add a snapshot test module to delay shard aggregations (#62082) (#62359) This change adds an aggregation that can be used to delay the query phase execution on shards with a configurable time: { "aggs": { "delay": { "shard_delay": { "value": "30s" }, "aggs": { "host": { "terms": { "field": "hostname" } } } } } } This test module is built on top of #61954 so the aggregation will be available only within snapshots since this module is not meant to be used in production. Closes #54159 --- .../delayed-aggs/build.gradle | 32 ++++ .../DelayedShardAggregationIT.java | 69 +++++++++ .../DelayedShardAggregationBuilder.java | 143 ++++++++++++++++++ .../DelayedShardAggregationPlugin.java | 46 ++++++ .../DelayedShardAggregationBuilderTests.java | 38 +++++ ...ShardAggregationClientYamlTestSuiteIT.java | 37 +++++ .../test/delayed_aggs/10_basic.yml | 22 +++ 7 files changed, 387 insertions(+) create mode 100644 test/external-modules/delayed-aggs/build.gradle create mode 100644 test/external-modules/delayed-aggs/src/internalClusterTest/java/org/elasticsearch/search/aggregations/DelayedShardAggregationIT.java create mode 100644 test/external-modules/delayed-aggs/src/main/java/org/elasticsearch/search/aggregations/DelayedShardAggregationBuilder.java create mode 100644 test/external-modules/delayed-aggs/src/main/java/org/elasticsearch/search/aggregations/DelayedShardAggregationPlugin.java create mode 100644 test/external-modules/delayed-aggs/src/test/java/org/elasticsearch/search/aggregations/DelayedShardAggregationBuilderTests.java create mode 100644 test/external-modules/delayed-aggs/src/yamlRestTest/java/org/elasticsearch/search/aggregations/DelayedShardAggregationClientYamlTestSuiteIT.java create mode 100644 test/external-modules/delayed-aggs/src/yamlRestTest/resources/rest-api-spec/test/delayed_aggs/10_basic.yml diff --git a/test/external-modules/delayed-aggs/build.gradle b/test/external-modules/delayed-aggs/build.gradle new file mode 100644 index 00000000000..d900a92daee --- /dev/null +++ b/test/external-modules/delayed-aggs/build.gradle @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +apply plugin: 'elasticsearch.esplugin' +apply plugin: 'elasticsearch.yaml-rest-test' + +esplugin { + description 'A test module that allows to delay aggregations on shards with a configurable time' + classname 'org.elasticsearch.search.aggregations.DelayedShardAggregationPlugin' +} + +restResources { + restApi { + includeCore '_common', 'indices', 'index', 'cluster', 'search' + } +} diff --git a/test/external-modules/delayed-aggs/src/internalClusterTest/java/org/elasticsearch/search/aggregations/DelayedShardAggregationIT.java b/test/external-modules/delayed-aggs/src/internalClusterTest/java/org/elasticsearch/search/aggregations/DelayedShardAggregationIT.java new file mode 100644 index 00000000000..b8f5c1514e4 --- /dev/null +++ b/test/external-modules/delayed-aggs/src/internalClusterTest/java/org/elasticsearch/search/aggregations/DelayedShardAggregationIT.java @@ -0,0 +1,69 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; +import org.elasticsearch.search.aggregations.metrics.InternalMax; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; + +public class DelayedShardAggregationIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(DelayedShardAggregationPlugin.class); + } + + public void testSimple() throws Exception { + assertAcked(client().admin().indices().prepareCreate("index")); + float expectedMax = Float.MIN_VALUE; + List reqs = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + float rand = randomFloat(); + expectedMax = Math.max(rand, expectedMax); + reqs.add(client().prepareIndex("index").setSource("number", rand)); + } + indexRandom(true, reqs); + SearchResponse response = client().prepareSearch("index") + .addAggregation( + new DelayedShardAggregationBuilder("delay", TimeValue.timeValueMillis(10)).subAggregation( + new MaxAggregationBuilder("max").field("number") + ) + ) + .get(); + Aggregations aggs = response.getAggregations(); + assertThat(aggs.get("delay"), instanceOf(InternalFilter.class)); + InternalFilter filter = aggs.get("delay"); + InternalMax max = filter.getAggregations().get("max"); + assertThat((float) max.getValue(), equalTo(expectedMax)); + } +} diff --git a/test/external-modules/delayed-aggs/src/main/java/org/elasticsearch/search/aggregations/DelayedShardAggregationBuilder.java b/test/external-modules/delayed-aggs/src/main/java/org/elasticsearch/search/aggregations/DelayedShardAggregationBuilder.java new file mode 100644 index 00000000000..fcc80424e65 --- /dev/null +++ b/test/external-modules/delayed-aggs/src/main/java/org/elasticsearch/search/aggregations/DelayedShardAggregationBuilder.java @@ -0,0 +1,143 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public class DelayedShardAggregationBuilder extends AbstractAggregationBuilder { + public static final String NAME = "shard_delay"; + + private TimeValue delay; + + public DelayedShardAggregationBuilder(String name, TimeValue delay) { + super(name); + this.delay = delay; + } + + public DelayedShardAggregationBuilder(StreamInput in) throws IOException { + super(in); + this.delay = in.readTimeValue(); + } + + @Override + protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metaData) { + return new DelayedShardAggregationBuilder(name, delay); + } + + @Override + public BucketCardinality bucketCardinality() { + return BucketCardinality.ONE; + } + + @Override + public String getType() { + return NAME; + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeTimeValue(delay); + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("value", delay.toString()); + builder.endObject(); + return builder; + } + + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + NAME, + false, + (args, name) -> new DelayedShardAggregationBuilder(name, TimeValue.parseTimeValue((String) args[0], "value")) + ); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField("value")); + } + + @Override + @SuppressWarnings("unchecked") + protected AggregatorFactory doBuild( + QueryShardContext queryShardContext, + AggregatorFactory parent, + AggregatorFactories.Builder subfactoriesBuilder + ) throws IOException { + + // Disable the request cache + queryShardContext.nowInMillis(); + + final FilterAggregationBuilder filterAgg = new FilterAggregationBuilder(name, QueryBuilders.matchAllQuery()).subAggregations( + subfactoriesBuilder + ); + final AggregatorFactory factory = filterAgg.build(queryShardContext, parent); + return new AggregatorFactory(name, queryShardContext, parent, subfactoriesBuilder, metadata) { + @Override + protected Aggregator createInternal( + SearchContext searchContext, + Aggregator parent, + CardinalityUpperBound cardinality, + Map metadata + ) throws IOException { + long start = searchContext.getRelativeTimeInMillis(); + long sleepTime = Math.min(delay.getMillis(), 100); + do { + if (searchContext.isCancelled()) { + break; + } + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw new IOException(e); + } + } while (searchContext.getRelativeTimeInMillis() - start < delay.getMillis()); + return factory.create(searchContext, parent, cardinality); + } + }; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + DelayedShardAggregationBuilder that = (DelayedShardAggregationBuilder) o; + return Objects.equals(delay, that.delay); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), delay); + } +} diff --git a/test/external-modules/delayed-aggs/src/main/java/org/elasticsearch/search/aggregations/DelayedShardAggregationPlugin.java b/test/external-modules/delayed-aggs/src/main/java/org/elasticsearch/search/aggregations/DelayedShardAggregationPlugin.java new file mode 100644 index 00000000000..387e305c3c3 --- /dev/null +++ b/test/external-modules/delayed-aggs/src/main/java/org/elasticsearch/search/aggregations/DelayedShardAggregationPlugin.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations; + +import java.util.List; + +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; + +import static java.util.Collections.singletonList; + +/** + * Test plugin that allows to delay aggregations on shards with a configurable time + */ +public class DelayedShardAggregationPlugin extends Plugin implements SearchPlugin { + public DelayedShardAggregationPlugin() {} + + @Override + public List getAggregations() { + return singletonList( + new AggregationSpec( + DelayedShardAggregationBuilder.NAME, + DelayedShardAggregationBuilder::new, + DelayedShardAggregationBuilder.PARSER + ).addResultReader(InternalFilter::new) + ); + } +} diff --git a/test/external-modules/delayed-aggs/src/test/java/org/elasticsearch/search/aggregations/DelayedShardAggregationBuilderTests.java b/test/external-modules/delayed-aggs/src/test/java/org/elasticsearch/search/aggregations/DelayedShardAggregationBuilderTests.java new file mode 100644 index 00000000000..b965dbc9455 --- /dev/null +++ b/test/external-modules/delayed-aggs/src/test/java/org/elasticsearch/search/aggregations/DelayedShardAggregationBuilderTests.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.TestGeoShapeFieldMapperPlugin; + +import java.util.Arrays; +import java.util.Collection; + +public class DelayedShardAggregationBuilderTests extends BaseAggregationTestCase { + @Override + protected Collection> getPlugins() { + return Arrays.asList(DelayedShardAggregationPlugin.class, TestGeoShapeFieldMapperPlugin.class); + } + + @Override + protected DelayedShardAggregationBuilder createTestAggregatorBuilder() { + return new DelayedShardAggregationBuilder(randomAlphaOfLength(10), TimeValue.timeValueMillis(100)); + } +} diff --git a/test/external-modules/delayed-aggs/src/yamlRestTest/java/org/elasticsearch/search/aggregations/DelayedShardAggregationClientYamlTestSuiteIT.java b/test/external-modules/delayed-aggs/src/yamlRestTest/java/org/elasticsearch/search/aggregations/DelayedShardAggregationClientYamlTestSuiteIT.java new file mode 100644 index 00000000000..a0bd53d7f07 --- /dev/null +++ b/test/external-modules/delayed-aggs/src/yamlRestTest/java/org/elasticsearch/search/aggregations/DelayedShardAggregationClientYamlTestSuiteIT.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; +import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; + +public class DelayedShardAggregationClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase { + public DelayedShardAggregationClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws Exception { + return ESClientYamlSuiteTestCase.createParameters(); + } +} diff --git a/test/external-modules/delayed-aggs/src/yamlRestTest/resources/rest-api-spec/test/delayed_aggs/10_basic.yml b/test/external-modules/delayed-aggs/src/yamlRestTest/resources/rest-api-spec/test/delayed_aggs/10_basic.yml new file mode 100644 index 00000000000..4521536bb6a --- /dev/null +++ b/test/external-modules/delayed-aggs/src/yamlRestTest/resources/rest-api-spec/test/delayed_aggs/10_basic.yml @@ -0,0 +1,22 @@ +# Integration tests for DelayAggregation components +# + +--- +"Delayed Aggs": + - do: + indices.create: + index: test + + - do: + search: + index: test + body: + aggs: + delay: + shard_delay: + value: "200ms" + + + - match: { hits.total.value: 0 } + - match: { aggregations.delay.doc_count: 0 } + - gt: { took: 100 }