bucketsPath is now in the Reducer class since every Reducer implementation will need it
This commit is contained in:
parent
18c2cb64b7
commit
9357fc4f95
|
@ -63,14 +63,16 @@ public abstract class Reducer implements Streamable {
|
|||
|
||||
}
|
||||
|
||||
protected String name;
|
||||
protected Map<String, Object> metaData;
|
||||
private String name;
|
||||
private String[] bucketsPaths;
|
||||
private Map<String, Object> metaData;
|
||||
|
||||
protected Reducer() { // for Serialisation
|
||||
}
|
||||
|
||||
protected Reducer(String name, Map<String, Object> metaData) {
|
||||
protected Reducer(String name, String[] bucketsPaths, Map<String, Object> metaData) {
|
||||
this.name = name;
|
||||
this.bucketsPaths = bucketsPaths;
|
||||
this.metaData = metaData;
|
||||
}
|
||||
|
||||
|
@ -78,6 +80,10 @@ public abstract class Reducer implements Streamable {
|
|||
return name;
|
||||
}
|
||||
|
||||
public String[] bucketsPaths() {
|
||||
return bucketsPaths;
|
||||
}
|
||||
|
||||
public Map<String, Object> metaData() {
|
||||
return metaData;
|
||||
}
|
||||
|
@ -89,6 +95,7 @@ public abstract class Reducer implements Streamable {
|
|||
@Override
|
||||
public final void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(name);
|
||||
out.writeStringArray(bucketsPaths);
|
||||
out.writeMap(metaData);
|
||||
doWriteTo(out);
|
||||
}
|
||||
|
@ -98,6 +105,7 @@ public abstract class Reducer implements Streamable {
|
|||
@Override
|
||||
public final void readFrom(StreamInput in) throws IOException {
|
||||
name = in.readString();
|
||||
bucketsPaths = in.readStringArray();
|
||||
metaData = in.readMap();
|
||||
doReadFrom(in);
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ public abstract class ReducerFactory {
|
|||
|
||||
protected String name;
|
||||
protected String type;
|
||||
protected String[] bucketsPaths;
|
||||
protected Map<String, Object> metaData;
|
||||
|
||||
/**
|
||||
|
@ -41,9 +42,10 @@ public abstract class ReducerFactory {
|
|||
* @param type
|
||||
* The aggregation type
|
||||
*/
|
||||
public ReducerFactory(String name, String type) {
|
||||
public ReducerFactory(String name, String type, String[] bucketsPaths) {
|
||||
this.name = name;
|
||||
this.type = type;
|
||||
this.bucketsPaths = bucketsPaths;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
|||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class DerivativeParser implements Reducer.Parser {
|
||||
|
||||
|
@ -44,17 +46,29 @@ public class DerivativeParser implements Reducer.Parser {
|
|||
public ReducerFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException {
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
String bucketsPath = null;
|
||||
String[] bucketsPaths = null;
|
||||
String format = null;
|
||||
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
if (BUCKETS_PATH.match(currentFieldName)) {
|
||||
bucketsPath = parser.text();
|
||||
} else if (FORMAT.match(currentFieldName)) {
|
||||
if (FORMAT.match(currentFieldName)) {
|
||||
format = parser.text();
|
||||
} else if (BUCKETS_PATH.match(currentFieldName)) {
|
||||
bucketsPaths = new String[] { parser.text() };
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
|
||||
+ currentFieldName + "].");
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_ARRAY) {
|
||||
if (BUCKETS_PATH.match(currentFieldName)) {
|
||||
List<String> paths = new ArrayList<>();
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
|
||||
String path = parser.text();
|
||||
paths.add(path);
|
||||
}
|
||||
bucketsPaths = paths.toArray(new String[paths.size()]);
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
|
||||
+ currentFieldName + "].");
|
||||
|
@ -64,7 +78,7 @@ public class DerivativeParser implements Reducer.Parser {
|
|||
}
|
||||
}
|
||||
|
||||
if (bucketsPath == null) {
|
||||
if (bucketsPaths == null) {
|
||||
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
|
||||
+ "] for derivative aggregation [" + reducerName + "]");
|
||||
}
|
||||
|
@ -74,7 +88,7 @@ public class DerivativeParser implements Reducer.Parser {
|
|||
formatter = ValueFormat.Patternable.Number.format(format).formatter();
|
||||
}
|
||||
|
||||
return new DerivativeReducer.Factory(reducerName, bucketsPath, formatter);
|
||||
return new DerivativeReducer.Factory(reducerName, bucketsPaths, formatter);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
|
|||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -72,14 +73,12 @@ public class DerivativeReducer extends Reducer {
|
|||
};
|
||||
|
||||
private ValueFormatter formatter;
|
||||
private String bucketsPath;
|
||||
|
||||
public DerivativeReducer() {
|
||||
}
|
||||
|
||||
public DerivativeReducer(String name, String bucketsPath, @Nullable ValueFormatter formatter, Map<String, Object> metadata) {
|
||||
super(name, metadata);
|
||||
this.bucketsPath = bucketsPath;
|
||||
public DerivativeReducer(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, Map<String, Object> metadata) {
|
||||
super(name, bucketsPaths, metadata);
|
||||
this.formatter = formatter;
|
||||
}
|
||||
|
||||
|
@ -115,7 +114,7 @@ public class DerivativeReducer extends Reducer {
|
|||
}
|
||||
|
||||
private double resolveBucketValue(InternalHistogram<? extends InternalHistogram.Bucket> histo, InternalHistogram.Bucket bucket) {
|
||||
Object propertyValue = bucket.getProperty(histo.getName(), AggregationPath.parse(bucketsPath)
|
||||
Object propertyValue = bucket.getProperty(histo.getName(), AggregationPath.parse(bucketsPaths()[0])
|
||||
.getPathElementsAsStringList());
|
||||
if (propertyValue instanceof Number) {
|
||||
return ((Number) propertyValue).doubleValue();
|
||||
|
@ -129,30 +128,27 @@ public class DerivativeReducer extends Reducer {
|
|||
|
||||
@Override
|
||||
public void doReadFrom(StreamInput in) throws IOException {
|
||||
bucketsPath = in.readString();
|
||||
|
||||
formatter = ValueFormatterStreams.readOptional(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doWriteTo(StreamOutput out) throws IOException {
|
||||
out.writeString(bucketsPath);
|
||||
ValueFormatterStreams.writeOptional(formatter, out);
|
||||
}
|
||||
|
||||
public static class Factory extends ReducerFactory {
|
||||
|
||||
private final String bucketsPath;
|
||||
private final ValueFormatter formatter;
|
||||
|
||||
public Factory(String name, String bucketsPath, @Nullable ValueFormatter formatter) {
|
||||
super(name, TYPE.name());
|
||||
this.bucketsPath = bucketsPath;
|
||||
public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter) {
|
||||
super(name, TYPE.name(), bucketsPaths);
|
||||
this.formatter = formatter;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Reducer createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket,
|
||||
Map<String, Object> metaData) throws IOException {
|
||||
return new DerivativeReducer(name, bucketsPath, formatter, metaData);
|
||||
return new DerivativeReducer(name, bucketsPaths, formatter, metaData);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue