From 83c328f125a66ba7e63c8b228bb67ad5da1fef43 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 9 Apr 2020 14:13:47 -0400 Subject: [PATCH] Deprecate serializing PipelineAggregators (#54926) (#55025) `PipelineAggregator`s are only sent across the wire for backwards compatibility with 7.7.0. `PipelineAggregator` needs to continue to implement `NamedWriteable` for backwards compatibility but pipeline aggregations created after 7.7.0 need not implement any of the methods in that interface because we'll never attempt to call them. So this creates implementations in `PipelineAggregator` (the base class) that just throw exceptions. --- .../elasticsearch/plugins/SearchPlugin.java | 49 ++++++++++++++++++- .../elasticsearch/search/SearchModule.java | 6 ++- .../aggregations/AggregatorFactories.java | 29 +++++------ .../pipeline/PipelineAggregator.java | 49 ++++++++++++++++--- 4 files changed, 104 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java b/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java index fe5c65d6704..e12e2cb604e 100644 --- a/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java @@ -349,8 +349,47 @@ public interface SearchPlugin { class PipelineAggregationSpec extends SearchExtensionSpec> { private final Map> resultReaders = new TreeMap<>(); + /** + * Read the aggregator from a stream. + * @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire + */ + @Deprecated private final Writeable.Reader aggregatorReader; + /** + * Specification of a {@link PipelineAggregator}. + * + * @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it + * is the name by under which the readers are registered. So it is the name that the {@link PipelineAggregationBuilder} and + * {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}. + * @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a + * {@link StreamInput} + * @param parser reads the aggregation builder from XContent + */ + public PipelineAggregationSpec(ParseField name, + Writeable.Reader builderReader, + ContextParser parser) { + super(name, builderReader, parser); + this.aggregatorReader = null; + } + + /** + * Specification of a {@link PipelineAggregator}. + * + * @param name name by which this aggregation might be parsed or deserialized. Make sure it is the name that the + * {@link PipelineAggregationBuilder} and {@link PipelineAggregator} should return from + * {@link NamedWriteable#getWriteableName()}. + * @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a + * {@link StreamInput} + * @param parser reads the aggregation builder from XContent + */ + public PipelineAggregationSpec(String name, + Writeable.Reader builderReader, + ContextParser parser) { + super(name, builderReader, parser); + this.aggregatorReader = null; + } + /** * Specification of a {@link PipelineAggregator}. * @@ -361,7 +400,10 @@ public interface SearchPlugin { * {@link StreamInput} * @param aggregatorReader reads the {@link PipelineAggregator} from a stream * @param parser reads the aggregation builder from XContent + * @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(ParseField, Writeable.Reader, ContextParser)} for + * pipelines implemented after 7.8.0 */ + @Deprecated public PipelineAggregationSpec(ParseField name, Writeable.Reader builderReader, Writeable.Reader aggregatorReader, @@ -380,7 +422,10 @@ public interface SearchPlugin { * {@link StreamInput} * @param aggregatorReader reads the {@link PipelineAggregator} from a stream * @param parser reads the aggregation builder from XContent + * @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(String, Writeable.Reader, ContextParser)} for pipelines + * implemented after 7.8.0 */ + @Deprecated public PipelineAggregationSpec(String name, Writeable.Reader builderReader, Writeable.Reader aggregatorReader, @@ -447,8 +492,10 @@ public interface SearchPlugin { } /** - * The reader for the {@link PipelineAggregator}. + * Read the aggregator from a stream. + * @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire */ + @Deprecated public Writeable.Reader getAggregatorReader() { return aggregatorReader; } diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java index 9db914f70da..5a8457d8083 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java @@ -575,8 +575,10 @@ public class SearchModule { } namedWriteables.add( new NamedWriteableRegistry.Entry(PipelineAggregationBuilder.class, spec.getName().getPreferredName(), spec.getReader())); - namedWriteables.add( - new NamedWriteableRegistry.Entry(PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader())); + if (spec.getAggregatorReader() != null) { + namedWriteables.add(new NamedWriteableRegistry.Entry( + PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader())); + } for (Map.Entry> resultReader : spec.getResultReaders().entrySet()) { namedWriteables .add(new NamedWriteableRegistry.Entry(InternalAggregation.class, resultReader.getKey(), resultReader.getValue())); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index 33d18f211f0..47e9fc4df36 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -57,6 +57,9 @@ import java.util.regex.Pattern; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +/** + * An immutable collection of {@link AggregatorFactories}. + */ public class AggregatorFactories { public static final Pattern VALID_AGG_NAME = Pattern.compile("[^\\[\\]>]+"); @@ -155,26 +158,16 @@ public class AggregatorFactories { return factories; } - public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], new ArrayList<>()); + public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0]); private AggregatorFactory[] factories; - private List pipelineAggregatorFactories; public static Builder builder() { return new Builder(); } - private AggregatorFactories(AggregatorFactory[] factories, List pipelineAggregators) { + private AggregatorFactories(AggregatorFactory[] factories) { this.factories = factories; - this.pipelineAggregatorFactories = pipelineAggregators; - } - - public List createPipelineAggregators() { - List pipelineAggregators = new ArrayList<>(this.pipelineAggregatorFactories.size()); - for (PipelineAggregationBuilder factory : this.pipelineAggregatorFactories) { - pipelineAggregators.add(factory.create()); - } - return pipelineAggregators; } /** @@ -216,13 +209,16 @@ public class AggregatorFactories { } /** - * @return the number of sub-aggregator factories not including pipeline - * aggregator factories + * @return the number of sub-aggregator factories */ public int countAggregators() { return factories.length; } + /** + * A mutable collection of {@link AggregationBuilder}s and + * {@link PipelineAggregationBuilder}s. + */ public static class Builder implements Writeable, ToXContentObject { private final Set names = new HashSet<>(); @@ -333,16 +329,13 @@ public class AggregatorFactories { if (aggregationBuilders.isEmpty() && pipelineAggregatorBuilders.isEmpty()) { return EMPTY; } - List orderedPipelineAggregators = - resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders); AggregatorFactory[] aggFactories = new AggregatorFactory[aggregationBuilders.size()]; - int i = 0; for (AggregationBuilder agg : aggregationBuilders) { aggFactories[i] = agg.build(queryShardContext, parent); ++i; } - return new AggregatorFactories(aggFactories, orderedPipelineAggregators); + return new AggregatorFactories(aggFactories); } private List resolvePipelineAggregatorOrder( diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java index 7444b2119e4..a8a01df5752 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations.pipeline; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; @@ -113,22 +114,54 @@ public abstract class PipelineAggregator implements NamedWriteable { /** * Read from a stream. + * @deprecated pipeline aggregations added after 7.8.0 shouldn't call this */ + @Deprecated protected PipelineAggregator(StreamInput in) throws IOException { - name = in.readString(); - bucketsPaths = in.readStringArray(); - metadata = in.readMap(); + if (in.getVersion().before(Version.V_7_8_0)) { + name = in.readString(); + bucketsPaths = in.readStringArray(); + metadata = in.readMap(); + } else { + throw new IllegalStateException("Cannot deserialize pipeline [" + getClass() + "] from before 7.8.0"); + } } + /** + * {@inheritDoc} + * @deprecated pipeline aggregations added after 7.8.0 shouldn't call this + */ @Override + @Deprecated public final void writeTo(StreamOutput out) throws IOException { - out.writeString(name); - out.writeStringArray(bucketsPaths); - out.writeMap(metadata); - doWriteTo(out); + if (out.getVersion().before(Version.V_7_8_0)) { + out.writeString(name); + out.writeStringArray(bucketsPaths); + out.writeMap(metadata); + doWriteTo(out); + } else { + throw new IllegalArgumentException("[" + name + "] is not supported on versions before 7.8.0"); + } + } + + /** + * Write the body of the aggregation to the wire. + * @deprecated pipeline aggregations added after 7.8.0 don't need to implement this + */ + @Deprecated + protected void doWriteTo(StreamOutput out) throws IOException { + } + + /** + * The name of the writeable object. + * @deprecated pipeline aggregations added after 7.8.0 don't need to implement this + */ + @Override + @Deprecated + public String getWriteableName() { + throw new IllegalArgumentException("[" + name + "] is not supported on versions before 7.8.0"); } - protected abstract void doWriteTo(StreamOutput out) throws IOException; public String name() { return name;