This change adds an aggregation that can be used to delay the query phase execution on shards with a configurable time: { "aggs": { "delay": { "shard_delay": { "value": "30s" }, "aggs": { "host": { "terms": { "field": "hostname" } } } } } } This test module is built on top of #61954 so the aggregation will be available only within snapshots since this module is not meant to be used in production. Closes #54159
This commit is contained in:
parent
3ed60df59d
commit
4eea602d2d
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
apply plugin: 'elasticsearch.esplugin'
|
||||
apply plugin: 'elasticsearch.yaml-rest-test'
|
||||
|
||||
esplugin {
|
||||
description 'A test module that allows to delay aggregations on shards with a configurable time'
|
||||
classname 'org.elasticsearch.search.aggregations.DelayedShardAggregationPlugin'
|
||||
}
|
||||
|
||||
restResources {
|
||||
restApi {
|
||||
includeCore '_common', 'indices', 'index', 'cluster', 'search'
|
||||
}
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
|
||||
import org.elasticsearch.search.aggregations.metrics.InternalMax;
|
||||
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
public class DelayedShardAggregationIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Arrays.asList(DelayedShardAggregationPlugin.class);
|
||||
}
|
||||
|
||||
public void testSimple() throws Exception {
|
||||
assertAcked(client().admin().indices().prepareCreate("index"));
|
||||
float expectedMax = Float.MIN_VALUE;
|
||||
List<IndexRequestBuilder> reqs = new ArrayList<>();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
float rand = randomFloat();
|
||||
expectedMax = Math.max(rand, expectedMax);
|
||||
reqs.add(client().prepareIndex("index").setSource("number", rand));
|
||||
}
|
||||
indexRandom(true, reqs);
|
||||
SearchResponse response = client().prepareSearch("index")
|
||||
.addAggregation(
|
||||
new DelayedShardAggregationBuilder("delay", TimeValue.timeValueMillis(10)).subAggregation(
|
||||
new MaxAggregationBuilder("max").field("number")
|
||||
)
|
||||
)
|
||||
.get();
|
||||
Aggregations aggs = response.getAggregations();
|
||||
assertThat(aggs.get("delay"), instanceOf(InternalFilter.class));
|
||||
InternalFilter filter = aggs.get("delay");
|
||||
InternalMax max = filter.getAggregations().get("max");
|
||||
assertThat((float) max.getValue(), equalTo(expectedMax));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,143 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public class DelayedShardAggregationBuilder extends AbstractAggregationBuilder<DelayedShardAggregationBuilder> {
|
||||
public static final String NAME = "shard_delay";
|
||||
|
||||
private TimeValue delay;
|
||||
|
||||
public DelayedShardAggregationBuilder(String name, TimeValue delay) {
|
||||
super(name);
|
||||
this.delay = delay;
|
||||
}
|
||||
|
||||
public DelayedShardAggregationBuilder(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
this.delay = in.readTimeValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metaData) {
|
||||
return new DelayedShardAggregationBuilder(name, delay);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BucketCardinality bucketCardinality() {
|
||||
return BucketCardinality.ONE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doWriteTo(StreamOutput out) throws IOException {
|
||||
out.writeTimeValue(delay);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field("value", delay.toString());
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
static final ConstructingObjectParser<DelayedShardAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(
|
||||
NAME,
|
||||
false,
|
||||
(args, name) -> new DelayedShardAggregationBuilder(name, TimeValue.parseTimeValue((String) args[0], "value"))
|
||||
);
|
||||
|
||||
static {
|
||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField("value"));
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected AggregatorFactory doBuild(
|
||||
QueryShardContext queryShardContext,
|
||||
AggregatorFactory parent,
|
||||
AggregatorFactories.Builder subfactoriesBuilder
|
||||
) throws IOException {
|
||||
|
||||
// Disable the request cache
|
||||
queryShardContext.nowInMillis();
|
||||
|
||||
final FilterAggregationBuilder filterAgg = new FilterAggregationBuilder(name, QueryBuilders.matchAllQuery()).subAggregations(
|
||||
subfactoriesBuilder
|
||||
);
|
||||
final AggregatorFactory factory = filterAgg.build(queryShardContext, parent);
|
||||
return new AggregatorFactory(name, queryShardContext, parent, subfactoriesBuilder, metadata) {
|
||||
@Override
|
||||
protected Aggregator createInternal(
|
||||
SearchContext searchContext,
|
||||
Aggregator parent,
|
||||
CardinalityUpperBound cardinality,
|
||||
Map<String, Object> metadata
|
||||
) throws IOException {
|
||||
long start = searchContext.getRelativeTimeInMillis();
|
||||
long sleepTime = Math.min(delay.getMillis(), 100);
|
||||
do {
|
||||
if (searchContext.isCancelled()) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
} while (searchContext.getRelativeTimeInMillis() - start < delay.getMillis());
|
||||
return factory.create(searchContext, parent, cardinality);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (!super.equals(o)) return false;
|
||||
DelayedShardAggregationBuilder that = (DelayedShardAggregationBuilder) o;
|
||||
return Objects.equals(delay, that.delay);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(super.hashCode(), delay);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.plugins.SearchPlugin;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
/**
|
||||
* Test plugin that allows to delay aggregations on shards with a configurable time
|
||||
*/
|
||||
public class DelayedShardAggregationPlugin extends Plugin implements SearchPlugin {
|
||||
public DelayedShardAggregationPlugin() {}
|
||||
|
||||
@Override
|
||||
public List<AggregationSpec> getAggregations() {
|
||||
return singletonList(
|
||||
new AggregationSpec(
|
||||
DelayedShardAggregationBuilder.NAME,
|
||||
DelayedShardAggregationBuilder::new,
|
||||
DelayedShardAggregationBuilder.PARSER
|
||||
).addResultReader(InternalFilter::new)
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.TestGeoShapeFieldMapperPlugin;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
public class DelayedShardAggregationBuilderTests extends BaseAggregationTestCase<DelayedShardAggregationBuilder> {
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
return Arrays.asList(DelayedShardAggregationPlugin.class, TestGeoShapeFieldMapperPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DelayedShardAggregationBuilder createTestAggregatorBuilder() {
|
||||
return new DelayedShardAggregationBuilder(randomAlphaOfLength(10), TimeValue.timeValueMillis(100));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Name;
|
||||
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
|
||||
|
||||
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
|
||||
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
|
||||
|
||||
public class DelayedShardAggregationClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
|
||||
public DelayedShardAggregationClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
|
||||
super(testCandidate);
|
||||
}
|
||||
|
||||
@ParametersFactory
|
||||
public static Iterable<Object[]> parameters() throws Exception {
|
||||
return ESClientYamlSuiteTestCase.createParameters();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
# Integration tests for DelayAggregation components
|
||||
#
|
||||
|
||||
---
|
||||
"Delayed Aggs":
|
||||
- do:
|
||||
indices.create:
|
||||
index: test
|
||||
|
||||
- do:
|
||||
search:
|
||||
index: test
|
||||
body:
|
||||
aggs:
|
||||
delay:
|
||||
shard_delay:
|
||||
value: "200ms"
|
||||
|
||||
|
||||
- match: { hits.total.value: 0 }
|
||||
- match: { aggregations.delay.doc_count: 0 }
|
||||
- gt: { took: 100 }
|
Loading…
Reference in New Issue