Deprecate serializing PipelineAggregators (#54926) (#55025)

`PipelineAggregator`s are only sent across the wire for backwards
compatibility with 7.7.0. `PipelineAggregator` needs to continue to
implement `NamedWriteable` for backwards compatibility but pipeline
aggregations created after 7.7.0 need not implement any of the methods
in that interface because we'll never attempt to call them. So this
creates implementations in `PipelineAggregator` (the base class) that
just throw exceptions.
This commit is contained in:
Nik Everett 2020-04-09 14:13:47 -04:00 committed by GitHub
parent 7f38b146b3
commit 83c328f125
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 104 additions and 29 deletions

View File

@ -349,8 +349,47 @@ public interface SearchPlugin {
class PipelineAggregationSpec extends SearchExtensionSpec<PipelineAggregationBuilder,
ContextParser<String, ? extends PipelineAggregationBuilder>> {
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
/**
* Read the aggregator from a stream.
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
*/
@Deprecated
private final Writeable.Reader<? extends PipelineAggregator> aggregatorReader;
/**
* Specification of a {@link PipelineAggregator}.
*
* @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it
* is the name by under which the readers are registered. So it is the name that the {@link PipelineAggregationBuilder} and
* {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}.
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
* {@link StreamInput}
* @param parser reads the aggregation builder from XContent
*/
public PipelineAggregationSpec(ParseField name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
super(name, builderReader, parser);
this.aggregatorReader = null;
}
/**
* Specification of a {@link PipelineAggregator}.
*
* @param name name by which this aggregation might be parsed or deserialized. Make sure it is the name that the
* {@link PipelineAggregationBuilder} and {@link PipelineAggregator} should return from
* {@link NamedWriteable#getWriteableName()}.
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
* {@link StreamInput}
* @param parser reads the aggregation builder from XContent
*/
public PipelineAggregationSpec(String name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
super(name, builderReader, parser);
this.aggregatorReader = null;
}
/**
* Specification of a {@link PipelineAggregator}.
*
@ -361,7 +400,10 @@ public interface SearchPlugin {
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @param parser reads the aggregation builder from XContent
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(ParseField, Writeable.Reader, ContextParser)} for
* pipelines implemented after 7.8.0
*/
@Deprecated
public PipelineAggregationSpec(ParseField name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
@ -380,7 +422,10 @@ public interface SearchPlugin {
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @param parser reads the aggregation builder from XContent
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(String, Writeable.Reader, ContextParser)} for pipelines
* implemented after 7.8.0
*/
@Deprecated
public PipelineAggregationSpec(String name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
@ -447,8 +492,10 @@ public interface SearchPlugin {
}
/**
* The reader for the {@link PipelineAggregator}.
* Read the aggregator from a stream.
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
*/
@Deprecated
public Writeable.Reader<? extends PipelineAggregator> getAggregatorReader() {
return aggregatorReader;
}

View File

@ -575,8 +575,10 @@ public class SearchModule {
}
namedWriteables.add(
new NamedWriteableRegistry.Entry(PipelineAggregationBuilder.class, spec.getName().getPreferredName(), spec.getReader()));
namedWriteables.add(
new NamedWriteableRegistry.Entry(PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader()));
if (spec.getAggregatorReader() != null) {
namedWriteables.add(new NamedWriteableRegistry.Entry(
PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader()));
}
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> resultReader : spec.getResultReaders().entrySet()) {
namedWriteables
.add(new NamedWriteableRegistry.Entry(InternalAggregation.class, resultReader.getKey(), resultReader.getValue()));

View File

@ -57,6 +57,9 @@ import java.util.regex.Pattern;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
/**
* An immutable collection of {@link AggregatorFactories}.
*/
public class AggregatorFactories {
public static final Pattern VALID_AGG_NAME = Pattern.compile("[^\\[\\]>]+");
@ -155,26 +158,16 @@ public class AggregatorFactories {
return factories;
}
public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], new ArrayList<>());
public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0]);
private AggregatorFactory[] factories;
private List<PipelineAggregationBuilder> pipelineAggregatorFactories;
public static Builder builder() {
return new Builder();
}
private AggregatorFactories(AggregatorFactory[] factories, List<PipelineAggregationBuilder> pipelineAggregators) {
private AggregatorFactories(AggregatorFactory[] factories) {
this.factories = factories;
this.pipelineAggregatorFactories = pipelineAggregators;
}
public List<PipelineAggregator> createPipelineAggregators() {
List<PipelineAggregator> pipelineAggregators = new ArrayList<>(this.pipelineAggregatorFactories.size());
for (PipelineAggregationBuilder factory : this.pipelineAggregatorFactories) {
pipelineAggregators.add(factory.create());
}
return pipelineAggregators;
}
/**
@ -216,13 +209,16 @@ public class AggregatorFactories {
}
/**
* @return the number of sub-aggregator factories not including pipeline
* aggregator factories
* @return the number of sub-aggregator factories
*/
public int countAggregators() {
return factories.length;
}
/**
* A mutable collection of {@link AggregationBuilder}s and
* {@link PipelineAggregationBuilder}s.
*/
public static class Builder implements Writeable, ToXContentObject {
private final Set<String> names = new HashSet<>();
@ -333,16 +329,13 @@ public class AggregatorFactories {
if (aggregationBuilders.isEmpty() && pipelineAggregatorBuilders.isEmpty()) {
return EMPTY;
}
List<PipelineAggregationBuilder> orderedPipelineAggregators =
resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders);
AggregatorFactory[] aggFactories = new AggregatorFactory[aggregationBuilders.size()];
int i = 0;
for (AggregationBuilder agg : aggregationBuilders) {
aggFactories[i] = agg.build(queryShardContext, parent);
++i;
}
return new AggregatorFactories(aggFactories, orderedPipelineAggregators);
return new AggregatorFactories(aggFactories);
}
private List<PipelineAggregationBuilder> resolvePipelineAggregatorOrder(

View File

@ -20,6 +20,7 @@
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
@ -113,22 +114,54 @@ public abstract class PipelineAggregator implements NamedWriteable {
/**
* Read from a stream.
* @deprecated pipeline aggregations added after 7.8.0 shouldn't call this
*/
@Deprecated
protected PipelineAggregator(StreamInput in) throws IOException {
name = in.readString();
bucketsPaths = in.readStringArray();
metadata = in.readMap();
if (in.getVersion().before(Version.V_7_8_0)) {
name = in.readString();
bucketsPaths = in.readStringArray();
metadata = in.readMap();
} else {
throw new IllegalStateException("Cannot deserialize pipeline [" + getClass() + "] from before 7.8.0");
}
}
/**
* {@inheritDoc}
* @deprecated pipeline aggregations added after 7.8.0 shouldn't call this
*/
@Override
@Deprecated
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeStringArray(bucketsPaths);
out.writeMap(metadata);
doWriteTo(out);
if (out.getVersion().before(Version.V_7_8_0)) {
out.writeString(name);
out.writeStringArray(bucketsPaths);
out.writeMap(metadata);
doWriteTo(out);
} else {
throw new IllegalArgumentException("[" + name + "] is not supported on versions before 7.8.0");
}
}
/**
* Write the body of the aggregation to the wire.
* @deprecated pipeline aggregations added after 7.8.0 don't need to implement this
*/
@Deprecated
protected void doWriteTo(StreamOutput out) throws IOException {
}
/**
* The name of the writeable object.
* @deprecated pipeline aggregations added after 7.8.0 don't need to implement this
*/
@Override
@Deprecated
public String getWriteableName() {
throw new IllegalArgumentException("[" + name + "] is not supported on versions before 7.8.0");
}
protected abstract void doWriteTo(StreamOutput out) throws IOException;
public String name() {
return name;