Disallow null/empty or duplicate composite sources ()

Adds some validation to prevent duplicate source names from being
used in the composite agg.

Also refactored to use a ConstructingObjectParser and removed the
private ctor and setter for sources, making it mandatory.
This commit is contained in:
Zachary Tong 2019-04-24 13:22:06 -04:00 committed by Zachary Tong
parent 1db9166ea0
commit ec5dd0594f
5 changed files with 147 additions and 22 deletions

@ -98,6 +98,11 @@ for the aggregation:
==== Values source
The `sources` parameter controls the sources that should be used to build the composite buckets.
The order that the `sources` are defined is important because it also controls the order
the keys are returned.
The name given to each sources must be unique.
There are three different types of values source:
===== Terms

@ -522,3 +522,65 @@ setup:
- match: { aggregations.test.buckets.0.key.long: 1000 }
- match: { aggregations.test.buckets.0.key.kw: "bar" }
- match: { aggregations.test.buckets.0.doc_count: 1 }
"Missing source":
- skip:
version: " - 7.0.99"
reason: null/empty sources disallowed in 7.1
- do:
catch: /Composite \[sources\] cannot be null or empty/
rest_total_hits_as_int: true
index: test
sources: []
- do:
catch: /Required \[sources\]/
rest_total_hits_as_int: true
index: test
size: 1
"Duplicate sources":
- skip:
version: " - 7.0.99"
reason: duplicate names disallowed in 7.1
- do:
catch: /Composite source names must be unique, found duplicates[:] \[keyword\]/
rest_total_hits_as_int: true
index: test
sources: [
"keyword": {
"terms": {
"field": "keyword",
"keyword": {
"terms": {
"field": "keyword",

@ -22,7 +22,7 @@ package org.elasticsearch.search.aggregations.bucket.composite;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
@ -34,9 +34,12 @@ import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
public class CompositeAggregationBuilder extends AbstractAggregationBuilder<CompositeAggregationBuilder> {
public static final String NAME = "composite";
@ -45,29 +48,36 @@ public class CompositeAggregationBuilder extends AbstractAggregationBuilder<Comp
public static final ParseField SIZE_FIELD_NAME = new ParseField("size");
public static final ParseField SOURCES_FIELD_NAME = new ParseField("sources");
private static final ObjectParser<CompositeAggregationBuilder, Void> PARSER;
static {
PARSER = new ObjectParser<>(NAME);
PARSER.declareInt(CompositeAggregationBuilder::size, SIZE_FIELD_NAME);
PARSER.declareObject(CompositeAggregationBuilder::aggregateAfter, (parser, context) -> parser.map(), AFTER_FIELD_NAME);
private static final Function<String, ConstructingObjectParser<CompositeAggregationBuilder, Void>> PARSER = name -> {
ConstructingObjectParser<CompositeAggregationBuilder, Void> parser = new ConstructingObjectParser<>(NAME, a -> {
CompositeAggregationBuilder builder = new CompositeAggregationBuilder(name, (List<CompositeValuesSourceBuilder<?>>)a[0]);
if (a[1] != null) {
if (a[2] != null) {
builder.aggregateAfter((Map<String, Object>)a[2]);
return builder;
(p, c) -> CompositeValuesSourceParserHelper.fromXContent(p), SOURCES_FIELD_NAME);
parser.declareInt(ConstructingObjectParser.optionalConstructorArg(), SIZE_FIELD_NAME);
parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, context) -> p.map(), AFTER_FIELD_NAME);
return parser;
public static CompositeAggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException {
return PARSER.parse(parser, new CompositeAggregationBuilder(aggregationName), null);
return PARSER.apply(aggregationName).parse(parser, null);
private List<CompositeValuesSourceBuilder<?>> sources;
private Map<String, Object> after;
private int size = 10;
private CompositeAggregationBuilder(String name) {
this(name, null);
public CompositeAggregationBuilder(String name, List<CompositeValuesSourceBuilder<?>> sources) {
this.sources = sources;
@ -116,11 +126,6 @@ public class CompositeAggregationBuilder extends AbstractAggregationBuilder<Comp
return NAME;
private CompositeAggregationBuilder setSources(List<CompositeValuesSourceBuilder<?>> sources) {
this.sources = sources;
return this;
* Gets the list of {@link CompositeValuesSourceBuilder} for this aggregation.
@ -167,6 +172,28 @@ public class CompositeAggregationBuilder extends AbstractAggregationBuilder<Comp
private static void validateSources(List<CompositeValuesSourceBuilder<?>> sources) {
if (sources == null || sources.isEmpty()) {
throw new IllegalArgumentException("Composite [" + SOURCES_FIELD_NAME.getPreferredName() + "] cannot be null or empty");
Set<String> names = new HashSet<>();
Set<String> duplicates = new HashSet<>();
sources.forEach(source -> {
if (source == null) {
throw new IllegalArgumentException("Composite source cannot be null");
boolean unique = names.add(source.name());
if (unique == false) {
if (duplicates.size() > 0) {
throw new IllegalArgumentException("Composite source names must be unique, found duplicates: " + duplicates);
protected AggregatorFactory<?> doBuild(SearchContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subfactoriesBuilder) throws IOException {

@ -965,7 +965,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
new TermsValuesSourceBuilder("keyword").field("keyword"),
new TermsValuesSourceBuilder("long").field("long"),
new TermsValuesSourceBuilder("long").field("double")
new TermsValuesSourceBuilder("double").field("double")
).aggregateAfter(createAfterKey("keyword", "z", "long", 100L, "double", 0.4d))
, (result) -> {
@ -1641,6 +1641,38 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
testRandomTerms("price", () -> randomInt(), (v) -> ((Number) v).intValue());
public void testDuplicateNames() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
List<CompositeValuesSourceBuilder<?>> builders = new ArrayList<>();
builders.add(new TermsValuesSourceBuilder("duplicate1").field("bar"));
builders.add(new TermsValuesSourceBuilder("duplicate1").field("baz"));
builders.add(new TermsValuesSourceBuilder("duplicate2").field("bar"));
builders.add(new TermsValuesSourceBuilder("duplicate2").field("baz"));
new CompositeAggregationBuilder("foo", builders);
assertThat(e.getMessage(), equalTo("Composite source names must be unique, found duplicates: [duplicate2, duplicate1]"));
public void testMissingSources() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
List<CompositeValuesSourceBuilder<?>> builders = new ArrayList<>();
new CompositeAggregationBuilder("foo", builders);
assertThat(e.getMessage(), equalTo("Composite [sources] cannot be null or empty"));
e = expectThrows(IllegalArgumentException.class, () -> new CompositeAggregationBuilder("foo", null));
assertThat(e.getMessage(), equalTo("Composite [sources] cannot be null or empty"));
public void testNullSourceNonNullCollection() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
List<CompositeValuesSourceBuilder<?>> builders = new ArrayList<>();
new CompositeAggregationBuilder("foo", builders);
assertThat(e.getMessage(), equalTo("Composite source cannot be null"));
private <T extends Comparable<T>, V extends Comparable<T>> void testRandomTerms(String field,
Supplier<T> randomSupplier,
Function<Object, V> transformKey) throws IOException {

@ -46,7 +46,6 @@ import java.util.function.Function;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -302,7 +301,7 @@ public class RollupIndexerStateTests extends ESTestCase {
RollupIndexerJobStats stats = new RollupIndexerJobStats();
RollupIndexerJobStats spyStats = spy(stats);
RollupJobConfig config = mock(RollupJobConfig.class);
RollupJobConfig config = ConfigTestHelpers.randomRollupJobConfig(random());
// We call stats before a final state check, so this allows us to flip the state
// and make sure the appropriate error is thrown