[7.x][Transform] add support for missing bucket (#59591) (#60390)

add support for "missing_bucket" in group_by

fixes #42941
fixes #55102
backport #59591
This commit is contained in:
Hendrik Muhs 2020-07-30 08:26:51 +02:00 committed by GitHub
parent 00a1949852
commit aaed6b59d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 743 additions and 263 deletions

View File

@ -188,9 +188,10 @@ public class DateHistogramGroupSource extends SingleGroupSource implements ToXCo
(args) -> {
String field = (String) args[0];
Script script = (Script) args[1];
String fixedInterval = (String) args[2];
String calendarInterval = (String) args[3];
ZoneId zoneId = (ZoneId) args[4];
boolean missingBucket = args[2] == null ? false : (boolean) args[2];
String fixedInterval = (String) args[3];
String calendarInterval = (String) args[4];
ZoneId zoneId = (ZoneId) args[5];
Interval interval = null;
@ -204,13 +205,14 @@ public class DateHistogramGroupSource extends SingleGroupSource implements ToXCo
throw new IllegalArgumentException("You must specify either fixed_interval or calendar_interval, found none");
}
return new DateHistogramGroupSource(field, script, interval, zoneId);
return new DateHistogramGroupSource(field, script, missingBucket, interval, zoneId);
}
);
static {
PARSER.declareString(optionalConstructorArg(), FIELD);
Script.declareScript(PARSER, optionalConstructorArg(), SCRIPT);
PARSER.declareBoolean(optionalConstructorArg(), MISSING_BUCKET);
PARSER.declareString(optionalConstructorArg(), new ParseField(FixedInterval.NAME));
PARSER.declareString(optionalConstructorArg(), new ParseField(CalendarInterval.NAME));
@ -231,7 +233,11 @@ public class DateHistogramGroupSource extends SingleGroupSource implements ToXCo
private final ZoneId timeZone;
DateHistogramGroupSource(String field, Script script, Interval interval, ZoneId timeZone) {
super(field, script);
this(field, script, false, interval, timeZone);
}
DateHistogramGroupSource(String field, Script script, boolean missingBucket, Interval interval, ZoneId timeZone) {
super(field, script, missingBucket);
this.interval = interval;
this.timeZone = timeZone;
}
@ -273,14 +279,16 @@ public class DateHistogramGroupSource extends SingleGroupSource implements ToXCo
final DateHistogramGroupSource that = (DateHistogramGroupSource) other;
return Objects.equals(this.field, that.field)
return this.missingBucket == that.missingBucket
&& Objects.equals(this.field, that.field)
&& Objects.equals(this.script, that.script)
&& Objects.equals(this.interval, that.interval)
&& Objects.equals(this.timeZone, that.timeZone);
}
@Override
public int hashCode() {
return Objects.hash(field, interval, timeZone);
return Objects.hash(field, script, missingBucket, interval, timeZone);
}
@Override
@ -298,6 +306,7 @@ public class DateHistogramGroupSource extends SingleGroupSource implements ToXCo
private Script script;
private Interval interval;
private ZoneId timeZone;
private boolean missingBucket;
/**
* The field with which to construct the date histogram grouping
@ -339,8 +348,18 @@ public class DateHistogramGroupSource extends SingleGroupSource implements ToXCo
return this;
}
/**
* Sets the value of "missing_bucket"
* @param missingBucket value of "missing_bucket" to be set
* @return The {@link Builder} with "missing_bucket" set.
*/
public Builder setMissingBucket(boolean missingBucket) {
this.missingBucket = missingBucket;
return this;
}
public DateHistogramGroupSource build() {
return new DateHistogramGroupSource(field, script, interval, timeZone);
return new DateHistogramGroupSource(field, script, missingBucket, interval, timeZone);
}
}
}

View File

@ -40,16 +40,18 @@ public class GeoTileGroupSource extends SingleGroupSource implements ToXContentO
private static final String NAME = "transform_geo_tile_group";
private static final ParseField PRECISION = new ParseField("precision");
private static final ConstructingObjectParser<GeoTileGroupSource, Void> PARSER = new ConstructingObjectParser<>(NAME, true, (args) -> {
private static final ConstructingObjectParser<GeoTileGroupSource, Void> PARSER = new ConstructingObjectParser<>(NAME, true, (args) -> {
String field = (String) args[0];
Integer precision = (Integer) args[1];
GeoBoundingBox boundingBox = (GeoBoundingBox) args[2];
boolean missingBucket = args[1] == null ? false : (boolean) args[1];
Integer precision = (Integer) args[2];
GeoBoundingBox boundingBox = (GeoBoundingBox) args[3];
return new GeoTileGroupSource(field, precision, boundingBox);
return new GeoTileGroupSource(field, missingBucket, precision, boundingBox);
});
static {
PARSER.declareString(optionalConstructorArg(), FIELD);
PARSER.declareBoolean(optionalConstructorArg(), MISSING_BUCKET);
PARSER.declareInt(optionalConstructorArg(), PRECISION);
PARSER.declareField(
optionalConstructorArg(),
@ -62,7 +64,11 @@ public class GeoTileGroupSource extends SingleGroupSource implements ToXContentO
private final GeoBoundingBox geoBoundingBox;
public GeoTileGroupSource(final String field, final Integer precision, final GeoBoundingBox boundingBox) {
super(field, null);
this(field, false, precision, boundingBox);
}
public GeoTileGroupSource(final String field, final boolean missingBucket, final Integer precision, final GeoBoundingBox boundingBox) {
super(field, null, missingBucket);
if (precision != null) {
GeoTileUtils.checkPrecisionRange(precision);
}
@ -113,14 +119,66 @@ public class GeoTileGroupSource extends SingleGroupSource implements ToXContentO
final GeoTileGroupSource that = (GeoTileGroupSource) other;
return Objects.equals(this.field, that.field)
return this.missingBucket == that.missingBucket
&& Objects.equals(this.field, that.field)
&& Objects.equals(this.precision, that.precision)
&& Objects.equals(this.geoBoundingBox, that.geoBoundingBox);
}
@Override
public int hashCode() {
return Objects.hash(field, precision, geoBoundingBox);
return Objects.hash(field, missingBucket, precision, geoBoundingBox);
}
public static class Builder {
private String field;
private boolean missingBucket;
private Integer precision;
private GeoBoundingBox boundingBox;
/**
* The field with which to construct the geo tile grouping
* @param field The field name
* @return The {@link Builder} with the field set.
*/
public Builder setField(String field) {
this.field = field;
return this;
}
/**
* Sets the value of "missing_bucket"
* @param missingBucket value of "missing_bucket" to be set
* @return The {@link Builder} with "missing_bucket" set.
*/
public Builder setMissingBucket(boolean missingBucket) {
this.missingBucket = missingBucket;
return this;
}
/**
* The precision with which to construct the geo tile grouping
* @param precision The precision
* @return The {@link Builder} with the precision set.
*/
public Builder setPrecission(Integer precision) {
this.precision = precision;
return this;
}
/**
* Set the bounding box for the geo tile grouping
* @param boundingBox The bounding box
* @return the {@link Builder} with the bounding box set.
*/
public Builder setBoundingBox(GeoBoundingBox boundingBox) {
this.boundingBox = boundingBox;
return this;
}
public GeoTileGroupSource build() {
return new GeoTileGroupSource(field, missingBucket, precision, boundingBox);
}
}
}

View File

@ -41,12 +41,13 @@ public class HistogramGroupSource extends SingleGroupSource implements ToXConten
private static final ConstructingObjectParser<HistogramGroupSource, Void> PARSER = new ConstructingObjectParser<>(
"histogram_group_source",
true,
args -> new HistogramGroupSource((String) args[0], (Script) args[1], (double) args[2])
args -> new HistogramGroupSource((String) args[0], (Script) args[1], args[2] == null ? false : (boolean) args[2], (double) args[3])
);
static {
PARSER.declareString(optionalConstructorArg(), FIELD);
Script.declareScript(PARSER, optionalConstructorArg(), SCRIPT);
PARSER.declareBoolean(optionalConstructorArg(), MISSING_BUCKET);
PARSER.declareDouble(optionalConstructorArg(), INTERVAL);
}
@ -57,7 +58,11 @@ public class HistogramGroupSource extends SingleGroupSource implements ToXConten
private final double interval;
HistogramGroupSource(String field, Script script, double interval) {
super(field, script);
this(field, script, false, interval);
}
HistogramGroupSource(String field, Script script, boolean missingBucket, double interval) {
super(field, script, missingBucket);
if (interval <= 0) {
throw new IllegalArgumentException("[interval] must be greater than 0.");
}
@ -94,12 +99,15 @@ public class HistogramGroupSource extends SingleGroupSource implements ToXConten
final HistogramGroupSource that = (HistogramGroupSource) other;
return Objects.equals(this.field, that.field) && Objects.equals(this.interval, that.interval);
return this.missingBucket == that.missingBucket
&& Objects.equals(this.field, that.field)
&& Objects.equals(this.script, that.script)
&& Objects.equals(this.interval, that.interval);
}
@Override
public int hashCode() {
return Objects.hash(field, interval);
return Objects.hash(field, script, interval, missingBucket);
}
public static Builder builder() {
@ -110,6 +118,7 @@ public class HistogramGroupSource extends SingleGroupSource implements ToXConten
private String field;
private Script script;
private boolean missingBucket;
private double interval;
/**
@ -123,7 +132,7 @@ public class HistogramGroupSource extends SingleGroupSource implements ToXConten
}
/**
* Set the interval for the histogram aggregation
* Set the interval for the histogram grouping
* @param interval The numeric interval for the histogram grouping
* @return The {@link Builder} with the interval set.
*/
@ -142,8 +151,18 @@ public class HistogramGroupSource extends SingleGroupSource implements ToXConten
return this;
}
/**
* Sets the value of "missing_bucket"
* @param missingBucket value of "missing_bucket" to be set
* @return The {@link Builder} with "missing_bucket" set.
*/
public Builder setMissingBucket(boolean missingBucket) {
this.missingBucket = missingBucket;
return this;
}
public HistogramGroupSource build() {
return new HistogramGroupSource(field, script, interval);
return new HistogramGroupSource(field, script, missingBucket, interval);
}
}
}

View File

@ -32,6 +32,7 @@ public abstract class SingleGroupSource implements ToXContentObject {
protected static final ParseField FIELD = new ParseField("field");
protected static final ParseField SCRIPT = new ParseField("script");
protected static final ParseField MISSING_BUCKET = new ParseField("missing_bucket");
public enum Type {
TERMS,
@ -46,10 +47,12 @@ public abstract class SingleGroupSource implements ToXContentObject {
protected final String field;
protected final Script script;
protected final boolean missingBucket;
public SingleGroupSource(final String field, final Script script) {
public SingleGroupSource(final String field, final Script script, final boolean missingBucket) {
this.field = field;
this.script = script;
this.missingBucket = missingBucket;
}
public abstract Type getType();
@ -62,6 +65,10 @@ public abstract class SingleGroupSource implements ToXContentObject {
return script;
}
public boolean getMissingBucket() {
return missingBucket;
}
protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
if (field != null) {
builder.field(FIELD.getPreferredName(), field);
@ -69,6 +76,9 @@ public abstract class SingleGroupSource implements ToXContentObject {
if (script != null) {
builder.field(SCRIPT.getPreferredName(), script);
}
if (missingBucket) {
builder.field(MISSING_BUCKET.getPreferredName(), missingBucket);
}
}
@Override
@ -83,11 +93,13 @@ public abstract class SingleGroupSource implements ToXContentObject {
final SingleGroupSource that = (SingleGroupSource) other;
return Objects.equals(this.field, that.field) && Objects.equals(this.script, that.script);
return this.missingBucket == that.missingBucket
&& Objects.equals(this.field, that.field)
&& Objects.equals(this.script, that.script);
}
@Override
public int hashCode() {
return Objects.hash(field, script);
return Objects.hash(field, script, missingBucket);
}
}

View File

@ -35,12 +35,13 @@ public class TermsGroupSource extends SingleGroupSource implements ToXContentObj
private static final ConstructingObjectParser<TermsGroupSource, Void> PARSER = new ConstructingObjectParser<>(
"terms_group_source",
true,
args -> new TermsGroupSource((String) args[0], (Script) args[1])
args -> new TermsGroupSource((String) args[0], (Script) args[1], args[2] == null ? false : (boolean) args[2])
);
static {
PARSER.declareString(optionalConstructorArg(), FIELD);
Script.declareScript(PARSER, optionalConstructorArg(), SCRIPT);
PARSER.declareBoolean(optionalConstructorArg(), MISSING_BUCKET);
}
public static TermsGroupSource fromXContent(final XContentParser parser) {
@ -48,7 +49,11 @@ public class TermsGroupSource extends SingleGroupSource implements ToXContentObj
}
TermsGroupSource(final String field, final Script script) {
super(field, script);
this(field, script, false);
}
TermsGroupSource(final String field, final Script script, final boolean missingBucket) {
super(field, script, missingBucket);
}
@Override
@ -72,9 +77,10 @@ public class TermsGroupSource extends SingleGroupSource implements ToXContentObj
private String field;
private Script script;
private boolean missingBucket;
/**
* The field with which to construct the date histogram grouping
* The field with which to construct the terms grouping
* @param field The field name
* @return The {@link Builder} with the field set.
*/
@ -93,8 +99,18 @@ public class TermsGroupSource extends SingleGroupSource implements ToXContentObj
return this;
}
/**
* Sets the value of "missing_bucket"
* @param missingBucket value of "missing_bucket" to be set
* @return The {@link Builder} with "missing_bucket" set.
*/
public Builder setMissingBucket(boolean missingBucket) {
this.missingBucket = missingBucket;
return this;
}
public TermsGroupSource build() {
return new TermsGroupSource(field, script);
return new TermsGroupSource(field, script, missingBucket);
}
}
}

View File

@ -41,7 +41,13 @@ public class DateHistogramGroupSourceTests extends AbstractXContentTestCase<Date
String field = randomAlphaOfLengthBetween(1, 20);
Script script = randomBoolean() ? new Script(randomAlphaOfLengthBetween(1, 10)) : null;
return new DateHistogramGroupSource(field, script, randomDateHistogramInterval(), randomBoolean() ? randomZone() : null);
return new DateHistogramGroupSource(
field,
script,
randomBoolean(),
randomDateHistogramInterval(),
randomBoolean() ? randomZone() : null
);
}
@Override

View File

@ -36,11 +36,14 @@ public class GeoTileGroupSourceTests extends AbstractXContentTestCase<GeoTileGro
Rectangle rectangle = GeometryTestUtils.randomRectangle();
return new GeoTileGroupSource(
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean(),
randomBoolean() ? null : randomIntBetween(1, GeoTileUtils.MAX_ZOOM),
randomBoolean() ? null : new GeoBoundingBox(
new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
)
randomBoolean()
? null
: new GeoBoundingBox(
new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
)
);
}

View File

@ -45,7 +45,7 @@ public class GroupConfigTests extends AbstractXContentTestCase<GroupConfig> {
for (int i = 0; i < randomIntBetween(1, 4); ++i) {
String targetFieldName = randomAlphaOfLengthBetween(1, 20);
if (names.add(targetFieldName)) {
SingleGroupSource groupBy;
SingleGroupSource groupBy = null;
SingleGroupSource.Type type = randomFrom(SingleGroupSource.Type.values());
switch (type) {
case TERMS:
@ -58,8 +58,10 @@ public class GroupConfigTests extends AbstractXContentTestCase<GroupConfig> {
groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource();
break;
case GEOTILE_GRID:
default:
groupBy = GeoTileGroupSourceTests.randomGeoTileGroupSource();
break;
default:
fail("unknown group source type, please implement tests and add support here");
}
groups.put(targetFieldName, groupBy);
}
@ -109,8 +111,11 @@ public class GroupConfigTests extends AbstractXContentTestCase<GroupConfig> {
+ " ]"
+ "}"
);
XContentParser parser = JsonXContent.jsonXContent
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json.streamInput());
XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
json.streamInput()
);
GroupConfig gc = GroupConfig.fromXContent(parser);
@ -138,8 +143,11 @@ public class GroupConfigTests extends AbstractXContentTestCase<GroupConfig> {
+ " }"
+ "}"
);
XContentParser parser = JsonXContent.jsonXContent
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json.streamInput());
XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
json.streamInput()
);
GroupConfig gc = GroupConfig.fromXContent(parser);

View File

@ -31,8 +31,9 @@ public class HistogramGroupSourceTests extends AbstractXContentTestCase<Histogra
public static HistogramGroupSource randomHistogramGroupSource() {
String field = randomAlphaOfLengthBetween(1, 20);
Script script = randomBoolean() ? new Script(randomAlphaOfLengthBetween(1, 10)) : null;
boolean missingBucket = randomBoolean();
double interval = randomDoubleBetween(Math.nextUp(0), Double.MAX_VALUE, false);
return new HistogramGroupSource(field, script, interval);
return new HistogramGroupSource(field, script, missingBucket, interval);
}
@Override

View File

@ -30,7 +30,7 @@ public class TermsGroupSourceTests extends AbstractXContentTestCase<TermsGroupSo
public static TermsGroupSource randomTermsGroupSource() {
Script script = randomBoolean() ? new Script(randomAlphaOfLengthBetween(1, 10)) : null;
return new TermsGroupSource(randomAlphaOfLengthBetween(1, 20), script);
return new TermsGroupSource(randomAlphaOfLengthBetween(1, 20), script, randomBoolean());
}
@Override

View File

@ -74,6 +74,7 @@ public class DateHistogramGroupSourceTests extends AbstractResponseTestCase<
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
randomBoolean(),
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomTimeValue(1, 100, "d", "h", "ms", "s", "m"))),
randomBoolean() ? randomZone() : null
);
@ -81,6 +82,7 @@ public class DateHistogramGroupSourceTests extends AbstractResponseTestCase<
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
randomBoolean(),
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))),
randomBoolean() ? randomZone() : null
);

View File

@ -29,7 +29,6 @@ import org.elasticsearch.geometry.Rectangle;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
import org.elasticsearch.xpack.core.transform.transforms.pivot.GeoTileGroupSource;
import static org.hamcrest.Matchers.equalTo;
public class GeoTileGroupSourceTests extends AbstractResponseTestCase<
@ -40,11 +39,14 @@ public class GeoTileGroupSourceTests extends AbstractResponseTestCase<
Rectangle rectangle = GeometryTestUtils.randomRectangle();
return new GeoTileGroupSource(
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean(),
randomBoolean() ? null : randomIntBetween(1, GeoTileUtils.MAX_ZOOM),
randomBoolean() ? null : new GeoBoundingBox(
new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
)
randomBoolean()
? null
: new GeoBoundingBox(
new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
)
);
}

View File

@ -36,9 +36,9 @@ public class HistogramGroupSourceTests extends AbstractResponseTestCase<
public static HistogramGroupSource randomHistogramGroupSource() {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
ScriptConfig scriptConfig = randomBoolean() ? null : DateHistogramGroupSourceTests.randomScriptConfig();
boolean missingBucket = randomBoolean();
double interval = randomDoubleBetween(Math.nextUp(0), Double.MAX_VALUE, false);
return new HistogramGroupSource(field, scriptConfig, interval);
return new HistogramGroupSource(field, scriptConfig, missingBucket, interval);
}
@Override

View File

@ -36,8 +36,8 @@ public class TermsGroupSourceTests extends AbstractResponseTestCase<
public static TermsGroupSource randomTermsGroupSource() {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
ScriptConfig scriptConfig = randomBoolean() ? null : DateHistogramGroupSourceTests.randomScriptConfig();
return new TermsGroupSource(field, scriptConfig);
boolean missingBucket = randomBoolean();
return new TermsGroupSource(field, scriptConfig, missingBucket);
}
@Override

View File

@ -208,8 +208,8 @@ public class DateHistogramGroupSource extends SingleGroupSource {
private final ZoneId timeZone;
private final Rounding.Prepared rounding;
public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval, ZoneId timeZone) {
super(field, scriptConfig);
public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, boolean missingBucket, Interval interval, ZoneId timeZone) {
super(field, scriptConfig, missingBucket);
this.interval = interval;
this.timeZone = timeZone;
rounding = buildRounding();
@ -245,9 +245,10 @@ public class DateHistogramGroupSource extends SingleGroupSource {
ConstructingObjectParser<DateHistogramGroupSource, Void> parser = new ConstructingObjectParser<>(NAME, lenient, (args) -> {
String field = (String) args[0];
ScriptConfig scriptConfig = (ScriptConfig) args[1];
String fixedInterval = (String) args[2];
String calendarInterval = (String) args[3];
ZoneId zoneId = (ZoneId) args[4];
boolean missingBucket = args[2] == null ? false : (boolean) args[2];
String fixedInterval = (String) args[3];
String calendarInterval = (String) args[4];
ZoneId zoneId = (ZoneId) args[5];
Interval interval = null;
@ -261,7 +262,7 @@ public class DateHistogramGroupSource extends SingleGroupSource {
throw new IllegalArgumentException("You must specify either fixed_interval or calendar_interval, found none");
}
return new DateHistogramGroupSource(field, scriptConfig, interval, zoneId);
return new DateHistogramGroupSource(field, scriptConfig, missingBucket, interval, zoneId);
});
declareValuesSourceFields(parser, lenient);
@ -336,12 +337,16 @@ public class DateHistogramGroupSource extends SingleGroupSource {
final DateHistogramGroupSource that = (DateHistogramGroupSource) other;
return Objects.equals(this.field, that.field) && Objects.equals(interval, that.interval) && Objects.equals(timeZone, that.timeZone);
return this.missingBucket == that.missingBucket
&& Objects.equals(this.field, that.field)
&& Objects.equals(this.scriptConfig, that.scriptConfig)
&& Objects.equals(this.interval, that.interval)
&& Objects.equals(this.timeZone, that.timeZone);
}
@Override
public int hashCode() {
return Objects.hash(field, interval, timeZone);
return Objects.hash(field, scriptConfig, missingBucket, interval, timeZone);
}
@Override

View File

@ -46,12 +46,14 @@ public class GeoTileGroupSource extends SingleGroupSource {
private static ConstructingObjectParser<GeoTileGroupSource, Void> createParser(boolean lenient) {
ConstructingObjectParser<GeoTileGroupSource, Void> parser = new ConstructingObjectParser<>(NAME, lenient, (args) -> {
String field = (String) args[0];
Integer precision = (Integer) args[1];
GeoBoundingBox boundingBox = (GeoBoundingBox) args[2];
boolean missingBucket = args[1] == null ? false : (boolean) args[1];
Integer precision = (Integer) args[2];
GeoBoundingBox boundingBox = (GeoBoundingBox) args[3];
return new GeoTileGroupSource(field, precision, boundingBox);
return new GeoTileGroupSource(field, missingBucket, precision, boundingBox);
});
parser.declareString(optionalConstructorArg(), FIELD);
parser.declareBoolean(optionalConstructorArg(), MISSING_BUCKET);
parser.declareInt(optionalConstructorArg(), PRECISION);
parser.declareField(
optionalConstructorArg(),
@ -65,8 +67,8 @@ public class GeoTileGroupSource extends SingleGroupSource {
private final Integer precision;
private final GeoBoundingBox geoBoundingBox;
public GeoTileGroupSource(final String field, final Integer precision, final GeoBoundingBox boundingBox) {
super(field, null);
public GeoTileGroupSource(final String field, final boolean missingBucket, final Integer precision, final GeoBoundingBox boundingBox) {
super(field, null, missingBucket);
if (precision != null) {
GeoTileUtils.checkPrecisionRange(precision);
}
@ -135,14 +137,15 @@ public class GeoTileGroupSource extends SingleGroupSource {
final GeoTileGroupSource that = (GeoTileGroupSource) other;
return Objects.equals(this.field, that.field)
return this.missingBucket == that.missingBucket
&& Objects.equals(this.field, that.field)
&& Objects.equals(this.precision, that.precision)
&& Objects.equals(this.geoBoundingBox, that.geoBoundingBox);
}
@Override
public int hashCode() {
return Objects.hash(field, precision, geoBoundingBox);
return Objects.hash(field, missingBucket, precision, geoBoundingBox);
}
@Override

View File

@ -25,8 +25,8 @@ public class HistogramGroupSource extends SingleGroupSource {
private static final ConstructingObjectParser<HistogramGroupSource, Void> LENIENT_PARSER = createParser(true);
private final double interval;
public HistogramGroupSource(String field, ScriptConfig scriptConfig, double interval) {
super(field, scriptConfig);
public HistogramGroupSource(String field, ScriptConfig scriptConfig, boolean missingBucket, double interval) {
super(field, scriptConfig, missingBucket);
if (interval <= 0) {
throw new IllegalArgumentException("[interval] must be greater than 0.");
}
@ -42,8 +42,9 @@ public class HistogramGroupSource extends SingleGroupSource {
ConstructingObjectParser<HistogramGroupSource, Void> parser = new ConstructingObjectParser<>(NAME, lenient, (args) -> {
String field = (String) args[0];
ScriptConfig scriptConfig = (ScriptConfig) args[1];
double interval = (double) args[2];
return new HistogramGroupSource(field, scriptConfig, interval);
boolean missingBucket = args[2] == null ? false : (boolean) args[2];
double interval = (double) args[3];
return new HistogramGroupSource(field, scriptConfig, missingBucket, interval);
});
declareValuesSourceFields(parser, lenient);
parser.declareDouble(optionalConstructorArg(), INTERVAL);
@ -90,12 +91,15 @@ public class HistogramGroupSource extends SingleGroupSource {
final HistogramGroupSource that = (HistogramGroupSource) other;
return Objects.equals(this.field, that.field) && Objects.equals(this.interval, that.interval);
return this.missingBucket == that.missingBucket
&& Objects.equals(this.field, that.field)
&& Objects.equals(this.scriptConfig, that.scriptConfig)
&& Objects.equals(this.interval, that.interval);
}
@Override
public int hashCode() {
return Objects.hash(field, interval);
return Objects.hash(field, scriptConfig, interval);
}
@Override

View File

@ -66,18 +66,22 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
protected static final ParseField FIELD = new ParseField("field");
protected static final ParseField SCRIPT = new ParseField("script");
protected static final ParseField MISSING_BUCKET = new ParseField("missing_bucket");
protected final String field;
protected final ScriptConfig scriptConfig;
protected final boolean missingBucket;
static <T> void declareValuesSourceFields(AbstractObjectParser<? extends SingleGroupSource, T> parser, boolean lenient) {
parser.declareString(optionalConstructorArg(), FIELD);
parser.declareObject(optionalConstructorArg(), (p, c) -> ScriptConfig.fromXContent(p, lenient), SCRIPT);
parser.declareBoolean(optionalConstructorArg(), MISSING_BUCKET);
}
public SingleGroupSource(final String field, final ScriptConfig scriptConfig) {
public SingleGroupSource(final String field, final ScriptConfig scriptConfig, final boolean missingBucket) {
this.field = field;
this.scriptConfig = scriptConfig;
this.missingBucket = missingBucket;
}
public SingleGroupSource(StreamInput in) throws IOException {
@ -87,6 +91,11 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
} else {
scriptConfig = null;
}
if (in.getVersion().onOrAfter(Version.V_7_10_0)) {
missingBucket = in.readBoolean();
} else {
missingBucket = false;
}
}
@Override
@ -104,6 +113,9 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
if (scriptConfig != null) {
builder.field(SCRIPT.getPreferredName(), scriptConfig);
}
if (missingBucket) {
builder.field(MISSING_BUCKET.getPreferredName(), missingBucket);
}
}
@Override
@ -112,6 +124,9 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeOptionalWriteable(scriptConfig);
}
if (out.getVersion().onOrAfter(Version.V_7_10_0)) {
out.writeBoolean(missingBucket);
}
}
public abstract Type getType();
@ -126,6 +141,10 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
return scriptConfig;
}
public boolean getMissingBucket() {
return missingBucket;
}
@Override
public boolean equals(Object other) {
if (this == other) {
@ -138,12 +157,14 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
final SingleGroupSource that = (SingleGroupSource) other;
return Objects.equals(this.field, that.field) && Objects.equals(this.scriptConfig, that.scriptConfig);
return this.missingBucket == that.missingBucket
&& Objects.equals(this.field, that.field)
&& Objects.equals(this.scriptConfig, that.scriptConfig);
}
@Override
public int hashCode() {
return Objects.hash(field, scriptConfig);
return Objects.hash(field, scriptConfig, missingBucket);
}
@Override

View File

@ -25,16 +25,17 @@ public class TermsGroupSource extends SingleGroupSource {
ConstructingObjectParser<TermsGroupSource, Void> parser = new ConstructingObjectParser<>(NAME, lenient, (args) -> {
String field = (String) args[0];
ScriptConfig scriptConfig = (ScriptConfig) args[1];
boolean missingBucket = args[2] == null ? false : (boolean) args[2];
return new TermsGroupSource(field, scriptConfig);
return new TermsGroupSource(field, scriptConfig, missingBucket);
});
SingleGroupSource.declareValuesSourceFields(parser, lenient);
return parser;
}
public TermsGroupSource(final String field, final ScriptConfig scriptConfig) {
super(field, scriptConfig);
public TermsGroupSource(final String field, final ScriptConfig scriptConfig, boolean missingBucket) {
super(field, scriptConfig, missingBucket);
}
public TermsGroupSource(StreamInput in) throws IOException {

View File

@ -34,7 +34,9 @@ public class UpdateTransformsActionResponseTests extends AbstractSerializingTran
}
public void testBWCPre78() throws IOException {
Response newResponse = createTestInstance();
Response newResponse = new Response(
TransformConfigTests.randomTransformConfigWithoutHeaders(Version.V_7_8_0, randomAlphaOfLengthBetween(1, 10))
);
UpdateTransformActionPre78.Response oldResponse = writeAndReadBWCObject(
newResponse,
getNamedWriteableRegistry(),

View File

@ -39,11 +39,11 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
return randomTransformConfigWithoutHeaders(randomAlphaOfLengthBetween(1, 10));
}
public static TransformConfig randomTransformConfig() {
return randomTransformConfig(randomAlphaOfLengthBetween(1, 10));
public static TransformConfig randomTransformConfigWithoutHeaders(String id) {
return randomTransformConfigWithoutHeaders(Version.CURRENT, id);
}
public static TransformConfig randomTransformConfigWithoutHeaders(String id) {
public static TransformConfig randomTransformConfigWithoutHeaders(Version version, String id) {
return new TransformConfig(
id,
randomSourceConfig(),
@ -51,7 +51,7 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
randomBoolean() ? null : randomSyncConfig(),
null,
PivotConfigTests.randomPivotConfig(),
PivotConfigTests.randomPivotConfig(version),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
SettingsConfigTests.randomSettingsConfig(),
null,
@ -59,7 +59,15 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
);
}
public static TransformConfig randomTransformConfig() {
return randomTransformConfig(randomAlphaOfLengthBetween(1, 10));
}
public static TransformConfig randomTransformConfig(String id) {
return randomTransformConfig(Version.CURRENT, id);
}
public static TransformConfig randomTransformConfig(Version version, String id) {
return new TransformConfig(
id,
randomSourceConfig(),
@ -67,7 +75,7 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
randomBoolean() ? null : randomSyncConfig(),
randomHeaders(),
PivotConfigTests.randomPivotConfig(),
PivotConfigTests.randomPivotConfig(version),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
randomBoolean() ? null : SettingsConfigTests.randomSettingsConfig(),
randomBoolean() ? null : Instant.now(),

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import java.time.ZoneOffset;
@ -25,13 +26,22 @@ import static org.hamcrest.Matchers.equalTo;
public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase<DateHistogramGroupSource> {
public static DateHistogramGroupSource randomDateHistogramGroupSource() {
return randomDateHistogramGroupSource(Version.CURRENT);
}
public static DateHistogramGroupSource randomDateHistogramGroupSource(Version version) {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
ScriptConfig scriptConfig = randomBoolean() ? null : ScriptConfigTests.randomScriptConfig();
ScriptConfig scriptConfig = version.onOrAfter(Version.V_7_7_0)
? randomBoolean() ? null : ScriptConfigTests.randomScriptConfig()
: null;
boolean missingBucket = version.onOrAfter(Version.V_7_10_0) ? randomBoolean() : false;
DateHistogramGroupSource dateHistogramGroupSource;
if (randomBoolean()) {
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
missingBucket,
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomTimeValue(1, 100, "d", "h", "ms", "s", "m"))),
randomBoolean() ? randomZone() : null
);
@ -39,6 +49,7 @@ public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase<D
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
missingBucket,
new DateHistogramGroupSource.CalendarInterval(
new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w", "M", "q", "y"))
),
@ -49,8 +60,12 @@ public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase<D
return dateHistogramGroupSource;
}
public void testBackwardsSerialization() throws IOException {
DateHistogramGroupSource groupSource = randomDateHistogramGroupSource();
public void testBackwardsSerialization72() throws IOException {
// version 7.7 introduced scripts, so test before that
DateHistogramGroupSource groupSource = randomDateHistogramGroupSource(
VersionUtils.randomVersionBetween(random(), Version.V_7_3_0, Version.V_7_7_0)
);
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_2_0);
groupSource.writeTo(output);
@ -82,6 +97,7 @@ public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase<D
DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource(
field,
null,
randomBoolean(),
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval("1d")),
null
);
@ -104,6 +120,7 @@ public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase<D
DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource(
field,
null,
randomBoolean(),
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval("1w")),
null
);

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.transform.transforms.pivot;
import org.elasticsearch.Version;
import org.elasticsearch.common.geo.GeoBoundingBox;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.Writeable.Reader;
@ -20,14 +21,22 @@ import java.io.IOException;
public class GeoTileGroupSourceTests extends AbstractSerializingTestCase<GeoTileGroupSource> {
public static GeoTileGroupSource randomGeoTileGroupSource() {
return randomGeoTileGroupSource(Version.CURRENT);
}
public static GeoTileGroupSource randomGeoTileGroupSource(Version version) {
Rectangle rectangle = GeometryTestUtils.randomRectangle();
boolean missingBucket = version.onOrAfter(Version.V_7_10_0) ? randomBoolean() : false;
return new GeoTileGroupSource(
randomBoolean() ? null : randomAlphaOfLength(10),
missingBucket,
randomBoolean() ? null : randomIntBetween(1, GeoTileUtils.MAX_ZOOM),
randomBoolean() ? null : new GeoBoundingBox(
new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
)
randomBoolean()
? null
: new GeoBoundingBox(
new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
)
);
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.transform.transforms.pivot;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.Writeable.Reader;
@ -32,6 +33,10 @@ public class GroupConfigTests extends AbstractSerializingTestCase<GroupConfig> {
private static final char[] ILLEGAL_FIELD_NAME_CHARACTERS = { '[', ']', '>' };
public static GroupConfig randomGroupConfig() {
return randomGroupConfig(Version.CURRENT);
}
public static GroupConfig randomGroupConfig(Version version) {
Map<String, Object> source = new LinkedHashMap<>();
Map<String, SingleGroupSource> groups = new LinkedHashMap<>();
@ -44,16 +49,16 @@ public class GroupConfigTests extends AbstractSerializingTestCase<GroupConfig> {
Type type = randomFrom(SingleGroupSource.Type.values());
switch (type) {
case TERMS:
groupBy = TermsGroupSourceTests.randomTermsGroupSource();
groupBy = TermsGroupSourceTests.randomTermsGroupSource(version);
break;
case HISTOGRAM:
groupBy = HistogramGroupSourceTests.randomHistogramGroupSource();
groupBy = HistogramGroupSourceTests.randomHistogramGroupSource(version);
break;
case DATE_HISTOGRAM:
groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource();
groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource(version);
break;
case GEOTILE_GRID:
groupBy = GeoTileGroupSourceTests.randomGeoTileGroupSource();
groupBy = GeoTileGroupSourceTests.randomGeoTileGroupSource(version);
break;
default:
fail("unknown group source type, please implement tests and add support here");

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.transform.transforms.pivot;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
@ -15,11 +16,17 @@ import java.io.IOException;
public class HistogramGroupSourceTests extends AbstractSerializingTestCase<HistogramGroupSource> {
public static HistogramGroupSource randomHistogramGroupSource() {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
ScriptConfig scriptConfig = randomBoolean() ? null : ScriptConfigTests.randomScriptConfig();
return randomHistogramGroupSource(Version.CURRENT);
}
public static HistogramGroupSource randomHistogramGroupSource(Version version) {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
ScriptConfig scriptConfig = version.onOrAfter(Version.V_7_7_0)
? randomBoolean() ? null : ScriptConfigTests.randomScriptConfig()
: null;
boolean missingBucket = version.onOrAfter(Version.V_7_10_0) ? randomBoolean() : false;
double interval = randomDoubleBetween(Math.nextUp(0), Double.MAX_VALUE, false);
return new HistogramGroupSource(field, scriptConfig, interval);
return new HistogramGroupSource(field, scriptConfig, missingBucket, interval);
}
@Override

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.transform.transforms.pivot;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.DeprecationHandler;
@ -24,16 +25,24 @@ import static org.hamcrest.Matchers.empty;
public class PivotConfigTests extends AbstractSerializingTransformTestCase<PivotConfig> {
public static PivotConfig randomPivotConfigWithDeprecatedFields() {
return randomPivotConfigWithDeprecatedFields(Version.CURRENT);
}
public static PivotConfig randomPivotConfigWithDeprecatedFields(Version version) {
return new PivotConfig(
GroupConfigTests.randomGroupConfig(),
GroupConfigTests.randomGroupConfig(version),
AggregationConfigTests.randomAggregationConfig(),
randomIntBetween(10, 10_000) // deprecated
);
}
public static PivotConfig randomPivotConfig() {
return randomPivotConfig(Version.CURRENT);
}
public static PivotConfig randomPivotConfig(Version version) {
return new PivotConfig(
GroupConfigTests.randomGroupConfig(),
GroupConfigTests.randomGroupConfig(version),
AggregationConfigTests.randomAggregationConfig(),
null // deprecated
);

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.transform.transforms.pivot;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
@ -15,10 +16,16 @@ import java.io.IOException;
public class TermsGroupSourceTests extends AbstractSerializingTestCase<TermsGroupSource> {
public static TermsGroupSource randomTermsGroupSource() {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
ScriptConfig scriptConfig = randomBoolean() ? null : ScriptConfigTests.randomScriptConfig();
return randomTermsGroupSource(Version.CURRENT);
}
return new TermsGroupSource(field, scriptConfig);
public static TermsGroupSource randomTermsGroupSource(Version version) {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
ScriptConfig scriptConfig = version.onOrAfter(Version.V_7_7_0)
? randomBoolean() ? null : ScriptConfigTests.randomScriptConfig()
: null;
boolean missingBucket = version.onOrAfter(Version.V_7_10_0) ? randomBoolean() : false;
return new TermsGroupSource(field, scriptConfig, missingBucket);
}
@Override

View File

@ -29,10 +29,8 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
@ -97,16 +95,18 @@ public class TransformPivotRestIT extends TransformRestTestCase {
public void testSimpleDataStreamPivot() throws Exception {
String indexName = "reviews_data_stream";
createReviewsIndex(indexName, 1000, "date", true);
createReviewsIndex(indexName, 1000, "date", true, -1, null);
String transformId = "simple_data_stream_pivot";
String transformIndex = "pivot_reviews_data_stream";
setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
createPivotReviewsTransform(transformId,
createPivotReviewsTransform(
transformId,
transformIndex,
null,
null,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS,
indexName);
indexName
);
startAndWaitForTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
@ -338,7 +338,7 @@ public class TransformPivotRestIT extends TransformRestTestCase {
public void testContinuousPivot() throws Exception {
String indexName = "continuous_reviews";
createReviewsIndex(indexName);
createReviewsIndex(indexName, 1000, "date", false, 5, "user_id");
String transformId = "simple_continuous_pivot";
String transformIndex = "pivot_reviews_continuous";
setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
@ -360,7 +360,8 @@ public class TransformPivotRestIT extends TransformRestTestCase {
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " \"field\": \"user_id\","
+ " \"missing_bucket\": true"
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
@ -376,7 +377,10 @@ public class TransformPivotRestIT extends TransformRestTestCase {
assertTrue(indexExists(transformIndex));
// get and check some users
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72);
// missing bucket check
assertOnePivotValue(transformIndex + "/_search?q=!_exists_:reviewer", 3.72);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_20", 3.769230769);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918);
@ -424,6 +428,19 @@ public class TransformPivotRestIT extends TransformRestTestCase {
.append("\",\"timestamp\":")
.append(dateStamp)
.append("}\n");
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
bulk.append("{")
.append("\"business_id\":\"")
.append("business_")
.append(business)
.append("\",\"stars\":")
.append(stars)
.append(",\"location\":\"")
.append(location)
.append("\",\"timestamp\":")
.append(dateStamp)
.append("}\n");
}
bulk.append("\r\n");
@ -439,20 +456,12 @@ public class TransformPivotRestIT extends TransformRestTestCase {
// assert that other users are unchanged
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72);
assertOnePivotValue(transformIndex + "/_search?q=!_exists_:reviewer", 4.36);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_20", 3.769230769);
Map<String, Object> user26searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_26");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", user26searchResult));
double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", user26searchResult)).get(0);
assertThat(actual, greaterThan(3.92));
Map<String, Object> user42searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_42");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", user42searchResult));
actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", user42searchResult)).get(0);
assertThat(actual, greaterThan(0.0));
assertThat(actual, lessThan(5.0));
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 4.354838709);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_42", 2.0);
}
public void testHistogramPivot() throws Exception {
@ -738,7 +747,7 @@ public class TransformPivotRestIT extends TransformRestTestCase {
+ " \"size\": 2"
+ " }}"
+ " } "
+" },"
+ " },"
+ " \"rare_users\": {"
+ " \"rare_terms\": {"
+ " \"field\": \"user_id\""
@ -769,25 +778,28 @@ public class TransformPivotRestIT extends TransformRestTestCase {
searchResult
)).get(0);
assertThat(commonUsers, is(not(nullValue())));
assertThat(commonUsers, equalTo(new HashMap<String, Object>(){{
put("user_10",
Collections.singletonMap(
"common_businesses",
new HashMap<String, Object>(){{
assertThat(commonUsers, equalTo(new HashMap<String, Object>() {
{
put("user_10", Collections.singletonMap("common_businesses", new HashMap<String, Object>() {
{
put("business_12", 6);
put("business_9", 4);
}}));
put("user_0", Collections.singletonMap(
"common_businesses",
new HashMap<String, Object>(){{
put("business_0", 35);
}}));
}}));
}
}));
put("user_0", Collections.singletonMap("common_businesses", new HashMap<String, Object>() {
{
put("business_0", 35);
}
}));
}
}));
assertThat(rareUsers, is(not(nullValue())));
assertThat(rareUsers, equalTo(new HashMap<String, Object>(){{
put("user_5", 1);
put("user_12", 1);
}}));
assertThat(rareUsers, equalTo(new HashMap<String, Object>() {
{
put("user_5", 1);
put("user_12", 1);
}
}));
}
private void assertDateHistogramPivot(String indexName) throws Exception {

View File

@ -52,8 +52,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class TransformProgressIT extends ESRestTestCase {
protected void createReviewsIndex() throws Exception {
protected void createReviewsIndex(int userWithMissingBuckets) throws Exception {
final int numDocs = 1000;
final RestHighLevelClient restClient = new TestRestHighLevelClient();
@ -99,12 +98,14 @@ public class TransformProgressIT extends ESRestTestCase {
String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec + "Z";
StringBuilder sourceBuilder = new StringBuilder();
sourceBuilder.append("{\"user_id\":\"")
.append("user_")
.append(user)
.append("\",\"count\":")
.append(i)
.append(",\"business_id\":\"")
sourceBuilder.append("{");
sourceBuilder.append("\"user_id\":\"").append("user_").append(user).append("\",");
if (user != userWithMissingBuckets) {
sourceBuilder.append("\"count\":").append(i).append(",");
}
sourceBuilder.append("\"business_id\":\"")
.append("business_")
.append(business)
.append("\",\"stars\":")
@ -121,18 +122,28 @@ public class TransformProgressIT extends ESRestTestCase {
day += 1;
}
}
restClient.bulk(bulk, RequestOptions.DEFAULT);
BulkResponse bulkResponse = restClient.bulk(bulk, RequestOptions.DEFAULT);
assertFalse(bulkResponse.hasFailures());
restClient.indices().refresh(new RefreshRequest(REVIEWS_INDEX_NAME), RequestOptions.DEFAULT);
}
public void testGetProgress() throws Exception {
assertGetProgress(-1);
}
public void testGetProgressMissingBucket() throws Exception {
assertGetProgress(randomIntBetween(1, 25));
}
public void assertGetProgress(int userWithMissingBuckets) throws Exception {
String transformId = "get_progress_transform";
createReviewsIndex();
boolean missingBucket = userWithMissingBuckets > 0;
createReviewsIndex(userWithMissingBuckets);
SourceConfig sourceConfig = new SourceConfig(REVIEWS_INDEX_NAME);
DestConfig destConfig = new DestConfig("unnecessary", null);
GroupConfig histgramGroupConfig = new GroupConfig(
Collections.emptyMap(),
Collections.singletonMap("every_50", new HistogramGroupSource("count", null, 50.0))
Collections.singletonMap("every_50", new HistogramGroupSource("count", null, missingBucket, 50.0))
);
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
aggs.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
@ -148,6 +159,12 @@ public class TransformProgressIT extends ESRestTestCase {
assertThat(progress.getDocumentsProcessed(), equalTo(0L));
assertThat(progress.getPercentComplete(), equalTo(0.0));
progress = getProgress(pivot, getProgressQuery(pivot, config.getSource().getIndex(), QueryBuilders.rangeQuery("stars").gte(2)));
assertThat(progress.getTotalDocs(), equalTo(600L));
assertThat(progress.getDocumentsProcessed(), equalTo(0L));
assertThat(progress.getPercentComplete(), equalTo(0.0));
progress = getProgress(
pivot,
getProgressQuery(pivot, config.getSource().getIndex(), QueryBuilders.termQuery("user_id", "user_26"))
@ -159,7 +176,7 @@ public class TransformProgressIT extends ESRestTestCase {
histgramGroupConfig = new GroupConfig(
Collections.emptyMap(),
Collections.singletonMap("every_50", new HistogramGroupSource("missing_field", null, 50.0))
Collections.singletonMap("every_50", new HistogramGroupSource("missing_field", null, missingBucket, 50.0))
);
pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null);
pivot = new Pivot(pivotConfig, transformId);
@ -169,9 +186,14 @@ public class TransformProgressIT extends ESRestTestCase {
getProgressQuery(pivot, config.getSource().getIndex(), QueryBuilders.termQuery("user_id", "user_26"))
);
assertThat(progress.getTotalDocs(), equalTo(0L));
assertThat(progress.getDocumentsProcessed(), equalTo(0L));
assertThat(progress.getPercentComplete(), equalTo(100.0));
if (missingBucket) {
assertThat(progress.getTotalDocs(), equalTo(35L));
assertThat(progress.getPercentComplete(), equalTo(0.0));
} else {
assertThat(progress.getTotalDocs(), equalTo(0L));
assertThat(progress.getPercentComplete(), equalTo(100.0));
}
deleteIndex(REVIEWS_INDEX_NAME);
}
@ -193,10 +215,7 @@ public class TransformProgressIT extends ESRestTestCase {
function.getInitialProgressFromResponse(
response,
new LatchedActionListener<>(
ActionListener.wrap(progressHolder::set, e -> { exceptionHolder.set(e); }),
latch
)
new LatchedActionListener<>(ActionListener.wrap(progressHolder::set, e -> { exceptionHolder.set(e); }), latch)
);
}

View File

@ -80,14 +80,88 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
return super.buildClient(settings, hosts);
}
protected void createReviewsIndex(String indexName, int numDocs, String dateType, boolean isDataStream) throws IOException {
int[] distributionTable = { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1 };
protected void createReviewsIndex(
String indexName,
int numDocs,
String dateType,
boolean isDataStream,
int userWithMissingBuckets,
String missingBucketField
) throws IOException {
putReviewsIndex(indexName, dateType, isDataStream);
int[] distributionTable = { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1 };
// create index
final StringBuilder bulk = new StringBuilder();
int day = 10;
int hour = 10;
int min = 10;
for (int i = 0; i < numDocs; i++) {
bulk.append("{\"create\":{\"_index\":\"" + indexName + "\"}}\n");
long user = Math.round(Math.pow(i * 31 % 1000, distributionTable[i % distributionTable.length]) % 27);
int stars = distributionTable[(i * 33) % distributionTable.length];
long business = Math.round(Math.pow(user * stars, distributionTable[i % distributionTable.length]) % 13);
if (i % 12 == 0) {
hour = 10 + (i % 13);
}
if (i % 5 == 0) {
min = 10 + (i % 49);
}
int sec = 10 + (i % 49);
String location = (user + 10) + "," + (user + 15);
String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec;
if (dateType.equals("date_nanos")) {
String randomNanos = "," + randomIntBetween(100000000, 999999999);
date_string += randomNanos;
}
date_string += "Z";
bulk.append("{");
if ((user == userWithMissingBuckets && missingBucketField.equals("user_id")) == false) {
bulk.append("\"user_id\":\"").append("user_").append(user).append("\",");
}
if ((user == userWithMissingBuckets && missingBucketField.equals("business_id")) == false) {
bulk.append("\"business_id\":\"").append("business_").append(business).append("\",");
}
if ((user == userWithMissingBuckets && missingBucketField.equals("stars")) == false) {
bulk.append("\"stars\":").append(stars).append(",");
}
if ((user == userWithMissingBuckets && missingBucketField.equals("location")) == false) {
bulk.append("\"location\":\"").append(location).append("\",");
}
if ((user == userWithMissingBuckets && missingBucketField.equals("timestamp")) == false) {
bulk.append("\"timestamp\":\"").append(date_string).append("\",");
}
// always add @timestamp to avoid complicated logic regarding ','
bulk.append("\"@timestamp\":\"").append(date_string).append("\"");
bulk.append("}\n");
if (i % 50 == 0) {
bulk.append("\r\n");
final Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);
// clear the builder
bulk.setLength(0);
day += 1;
}
}
bulk.append("\r\n");
final Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);
}
protected void putReviewsIndex(String indexName, String dateType, boolean isDataStream) throws IOException {
// create mapping
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("mappings").startObject("properties");
builder.startObject("@timestamp").field("type", dateType);
if (dateType.equals("date_nanos")) {
@ -118,15 +192,18 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
if (isDataStream) {
Request createCompositeTemplate = new Request("PUT", "_index_template/" + indexName + "_template");
createCompositeTemplate.setJsonEntity(
"{\n" +
" \"index_patterns\": [ \"" + indexName + "\" ],\n" +
" \"data_stream\": {\n" +
" },\n" +
" \"template\": \n" + Strings.toString(builder) +
"}"
"{\n"
+ " \"index_patterns\": [ \""
+ indexName
+ "\" ],\n"
+ " \"data_stream\": {\n"
+ " },\n"
+ " \"template\": \n"
+ Strings.toString(builder)
+ "}"
);
client().performRequest(createCompositeTemplate);
client().performRequest(new Request("PUT", "_data_stream/" + indexName));
client().performRequest(new Request("PUT", "_data_stream/" + indexName));
} else {
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
Request req = new Request("PUT", indexName);
@ -134,66 +211,6 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
client().performRequest(req);
}
}
// create index
final StringBuilder bulk = new StringBuilder();
int day = 10;
int hour = 10;
int min = 10;
for (int i = 0; i < numDocs; i++) {
bulk.append("{\"create\":{\"_index\":\"" + indexName + "\"}}\n");
long user = Math.round(Math.pow(i * 31 % 1000, distributionTable[i % distributionTable.length]) % 27);
int stars = distributionTable[(i * 33) % distributionTable.length];
long business = Math.round(Math.pow(user * stars, distributionTable[i % distributionTable.length]) % 13);
if (i % 12 == 0) {
hour = 10 + (i % 13);
}
if (i % 5 == 0) {
min = 10 + (i % 49);
}
int sec = 10 + (i % 49);
String location = (user + 10) + "," + (user + 15);
String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec;
if (dateType.equals("date_nanos")) {
String randomNanos = "," + randomIntBetween(100000000, 999999999);
date_string += randomNanos;
}
date_string += "Z";
bulk.append("{\"user_id\":\"")
.append("user_")
.append(user)
.append("\",\"business_id\":\"")
.append("business_")
.append(business)
.append("\",\"stars\":")
.append(stars)
.append(",\"location\":\"")
.append(location)
.append("\",\"timestamp\":\"")
.append(date_string)
.append("\",\"@timestamp\":\"")
.append(date_string)
.append("\"}\n");
if (i % 50 == 0) {
bulk.append("\r\n");
final Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);
// clear the builder
bulk.setLength(0);
day += 1;
}
}
bulk.append("\r\n");
final Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);
}
/**
@ -204,7 +221,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
protected void createReviewsIndex(String indexName) throws IOException {
createReviewsIndex(indexName, 1000, "date", false);
createReviewsIndex(indexName, 1000, "date", false, -1, null);
}
protected void createPivotReviewsTransform(String transformId, String transformIndex, String query) throws IOException {
@ -217,7 +234,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
protected void createReviewsIndexNano() throws IOException {
createReviewsIndex(REVIEWS_DATE_NANO_INDEX_NAME, 1000, "date_nanos", false);
createReviewsIndex(REVIEWS_DATE_NANO_INDEX_NAME, 1000, "date_nanos", false, -1, null);
}
protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader) throws IOException {
@ -247,12 +264,14 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
protected void createPivotReviewsTransform(String transformId,
String transformIndex,
String query,
String pipeline,
String authHeader,
String sourceIndex) throws IOException {
protected void createPivotReviewsTransform(
String transformId,
String transformIndex,
String query,
String pipeline,
String authHeader,
String sourceIndex
) throws IOException {
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
String config = "{";

View File

@ -64,7 +64,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
public void testForceStopFailedTransform() throws Exception {
String transformId = "test-force-stop-failed-transform";
createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false);
createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false, -1, null);
String transformIndex = "failure_pivot_reviews";
createDestinationIndexWithBadMapping(transformIndex);
createContinuousPivotReviewsTransform(transformId, transformIndex, null);
@ -102,7 +102,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
public void testStartFailedTransform() throws Exception {
String transformId = "test-force-start-failed-transform";
createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false);
createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false, -1, null);
String transformIndex = "failure_pivot_reviews";
createDestinationIndexWithBadMapping(transformIndex);
createContinuousPivotReviewsTransform(transformId, transformIndex, null);

View File

@ -92,6 +92,13 @@ public interface Function {
* @return the position, null in case the collector is exhausted
*/
Map<String, Object> getBucketPosition();
/**
* Whether the collector optimizes change detection by narrowing the required query.
*
* @return true if the collector optimizes change detection
*/
boolean isOptimized();
}
/**

View File

@ -347,6 +347,21 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
if (isContinuous()) {
changeCollector = function.buildChangeCollector(getConfig().getSyncConfig().getField());
if (changeCollector.isOptimized() == false) {
logger.warn(
new ParameterizedMessage(
"[{}] could not find any optimizations for continuous execution, "
+ "this transform might run slowly, please check your configuration.",
getJobId()
)
);
auditor.warning(
getJobId(),
"could not find any optimizations for continuous execution, "
+ "this transform might run slowly, please check your configuration."
);
}
}
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.geometry.Rectangle;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.ExistsQueryBuilder;
import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@ -90,18 +91,30 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
* Clear the field collector, e.g. the changes to free up memory.
*/
void clear();
/**
* Whether the collector optimizes change detection by narrowing the required query.
*
* @return true if the collector optimizes change detection
*/
boolean isOptimized();
}
static class TermsFieldCollector implements FieldCollector {
private final String sourceFieldName;
private final String targetFieldName;
private final boolean missingBucket;
private final Set<String> changedTerms;
// although we could add null to the hash set, its easier to handle null separately
private boolean foundNullBucket;
TermsFieldCollector(final String sourceFieldName, final String targetFieldName) {
TermsFieldCollector(final String sourceFieldName, final String targetFieldName, final boolean missingBucket) {
this.sourceFieldName = sourceFieldName;
this.targetFieldName = targetFieldName;
this.missingBucket = missingBucket;
this.changedTerms = new HashSet<>();
this.foundNullBucket = false;
}
@Override
@ -114,11 +127,16 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
@Override
public boolean collectChanges(Collection<? extends Bucket> buckets) {
changedTerms.clear();
foundNullBucket = false;
for (Bucket b : buckets) {
Object term = b.getKey().get(targetFieldName);
if (term != null) {
changedTerms.add(term.toString());
} else {
// we should not find a null bucket if missing bucket is false
assert missingBucket;
foundNullBucket = true;
}
}
@ -127,7 +145,44 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
@Override
public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) {
if (changedTerms.isEmpty() == false) {
if (missingBucket && foundNullBucket) {
QueryBuilder missingBucketQuery = new BoolQueryBuilder().mustNot(new ExistsQueryBuilder(sourceFieldName));
if (changedTerms.isEmpty()) {
return missingBucketQuery;
}
/**
* Combined query with terms and missing bucket:
*
* "bool": {
* "should": [
* {
* "terms": {
* "source_field": [
* "term1",
* "term2",
* ...
* ]
* }
* },
* {
* "bool": {
* "must_not": [
* {
* "exists": {
* "field": "source_field"
* }
* }
* ]
* }
* }
* ]
* }
*/
return new BoolQueryBuilder().should(new TermsQueryBuilder(sourceFieldName, changedTerms)).should(missingBucketQuery);
} else if (changedTerms.isEmpty() == false) {
return new TermsQueryBuilder(sourceFieldName, changedTerms);
}
return null;
@ -136,31 +191,43 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
@Override
public void clear() {
changedTerms.clear();
foundNullBucket = false;
}
@Override
public AggregationBuilder aggregateChanges() {
return null;
}
@Override
public boolean isOptimized() {
return true;
}
}
static class DateHistogramFieldCollector implements FieldCollector {
private final String sourceFieldName;
private final String targetFieldName;
private final boolean isSynchronizationField;
private final boolean missingBucket;
private final boolean applyOptimizationForSyncField;
private final Rounding.Prepared rounding;
DateHistogramFieldCollector(
final String sourceFieldName,
final String targetFieldName,
final boolean missingBucket,
final Rounding.Prepared rounding,
final boolean isSynchronizationField
) {
this.sourceFieldName = sourceFieldName;
this.targetFieldName = targetFieldName;
this.missingBucket = missingBucket;
this.rounding = rounding;
this.isSynchronizationField = isSynchronizationField;
// if missing_bucket is set to true, we can't apply the optimization, note: this combination
// is illogical, because the sync field should be steady
this.applyOptimizationForSyncField = isSynchronizationField && (missingBucket == false);
}
@Override
@ -176,7 +243,9 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
@Override
public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) {
if (isSynchronizationField && lastCheckpointTimestamp > 0) {
if (applyOptimizationForSyncField && lastCheckpointTimestamp > 0) {
assert missingBucket == false;
return new RangeQueryBuilder(sourceFieldName).gte(rounding.round(lastCheckpointTimestamp)).format("epoch_millis");
}
@ -192,16 +261,24 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
public AggregationBuilder aggregateChanges() {
return null;
}
@Override
public boolean isOptimized() {
// we only have 1 optimization
return applyOptimizationForSyncField;
}
}
static class HistogramFieldCollector implements FieldCollector {
private final String sourceFieldName;
private final String targetFieldName;
private final boolean missingBucket;
HistogramFieldCollector(final String sourceFieldName, final String targetFieldName) {
HistogramFieldCollector(final String sourceFieldName, final String targetFieldName, final boolean missingBucket) {
this.sourceFieldName = sourceFieldName;
this.targetFieldName = targetFieldName;
this.missingBucket = missingBucket;
}
@Override
@ -226,18 +303,28 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
public AggregationBuilder aggregateChanges() {
return null;
}
@Override
public boolean isOptimized() {
return false;
}
}
static class GeoTileFieldCollector implements FieldCollector {
private final String sourceFieldName;
private final String targetFieldName;
private final boolean missingBucket;
private final Set<String> changedBuckets;
// although we could add null to the hash set, its easier to handle null separately
private boolean foundNullBucket;
GeoTileFieldCollector(final String sourceFieldName, final String targetFieldName) {
GeoTileFieldCollector(final String sourceFieldName, final String targetFieldName, final boolean missingBucket) {
this.sourceFieldName = sourceFieldName;
this.targetFieldName = targetFieldName;
this.missingBucket = missingBucket;
this.changedBuckets = new HashSet<>();
this.foundNullBucket = false;
}
@Override
@ -249,11 +336,16 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
@Override
public boolean collectChanges(Collection<? extends Bucket> buckets) {
changedBuckets.clear();
foundNullBucket = false;
for (Bucket b : buckets) {
Object bucket = b.getKey().get(targetFieldName);
if (bucket != null) {
changedBuckets.add(bucket.toString());
} else {
// we should not find a null bucket if missing bucket is false
assert missingBucket;
foundNullBucket = true;
}
}
@ -262,16 +354,69 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
@Override
public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) {
if (changedBuckets != null && changedBuckets.isEmpty() == false) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
changedBuckets.stream().map(GeoTileUtils::toBoundingBox).map(this::toGeoQuery).forEach(boolQueryBuilder::should);
return boolQueryBuilder;
BoolQueryBuilder boundingBoxesQueryBuilder = null;
if (changedBuckets.isEmpty() == false) {
boundingBoxesQueryBuilder = QueryBuilders.boolQuery();
changedBuckets.stream().map(GeoTileUtils::toBoundingBox).map(this::toGeoQuery).forEach(boundingBoxesQueryBuilder::should);
}
return null;
if (missingBucket && foundNullBucket) {
QueryBuilder missingBucketQuery = new BoolQueryBuilder().mustNot(new ExistsQueryBuilder(sourceFieldName));
if (boundingBoxesQueryBuilder == null) {
return missingBucketQuery;
}
/**
* Combined query with geo bounding boxes and missing bucket:
*
* "bool": {
* "should": [
* {
* "geo_bounding_box": {
* "source_field": {
* "top_left": {
* "lat": x1,
* "lon": y1
* },
* "bottom_right": {
* "lat": x2,
* "lon": y2
* }
* }
* }
* },
* {
* "geo_bounding_box": {
* ...
* }
* },
* {
* "bool": {
* "must_not": [
* {
* "exists": {
* "field": "source_field"
* }
* }
* ]
* }
* }
* ]
* }
*/
return boundingBoxesQueryBuilder.should(missingBucketQuery);
}
return boundingBoxesQueryBuilder;
}
@Override
public void clear() {}
public void clear() {
changedBuckets.clear();
foundNullBucket = false;
}
@Override
public AggregationBuilder aggregateChanges() {
@ -285,9 +430,14 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
);
}
@Override
public boolean isOptimized() {
return true;
}
}
public CompositeBucketsChangeCollector(
private CompositeBucketsChangeCollector(
@Nullable CompositeAggregationBuilder compositeAggregation,
Map<String, FieldCollector> fieldCollectors
) {
@ -368,6 +518,11 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
return afterKey;
}
@Override
public boolean isOptimized() {
return fieldCollectors.values().stream().anyMatch(FieldCollector::isOptimized);
}
public static ChangeCollector buildChangeCollector(
@Nullable CompositeAggregationBuilder compositeAggregationBuilder,
Map<String, SingleGroupSource> groups,
@ -385,13 +540,21 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
case TERMS:
fieldCollectors.put(
entry.getKey(),
new CompositeBucketsChangeCollector.TermsFieldCollector(entry.getValue().getField(), entry.getKey())
new CompositeBucketsChangeCollector.TermsFieldCollector(
entry.getValue().getField(),
entry.getKey(),
entry.getValue().getMissingBucket()
)
);
break;
case HISTOGRAM:
fieldCollectors.put(
entry.getKey(),
new CompositeBucketsChangeCollector.HistogramFieldCollector(entry.getValue().getField(), entry.getKey())
new CompositeBucketsChangeCollector.HistogramFieldCollector(
entry.getValue().getField(),
entry.getKey(),
entry.getValue().getMissingBucket()
)
);
break;
case DATE_HISTOGRAM:
@ -400,6 +563,7 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
new CompositeBucketsChangeCollector.DateHistogramFieldCollector(
entry.getValue().getField(),
entry.getKey(),
entry.getValue().getMissingBucket(),
((DateHistogramGroupSource) entry.getValue()).getRounding(),
entry.getKey().equals(synchronizationField)
)
@ -408,7 +572,11 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
case GEOTILE_GRID:
fieldCollectors.put(
entry.getKey(),
new CompositeBucketsChangeCollector.GeoTileFieldCollector(entry.getValue().getField(), entry.getKey())
new CompositeBucketsChangeCollector.GeoTileFieldCollector(
entry.getValue().getField(),
entry.getKey(),
entry.getValue().getMissingBucket()
)
);
break;
default:

View File

@ -284,15 +284,11 @@ public class Pivot implements Function {
public SearchSourceBuilder buildSearchQueryForInitialProgress(SearchSourceBuilder searchSourceBuilder) {
BoolQueryBuilder existsClauses = QueryBuilders.boolQuery();
config.getGroupConfig()
.getGroups()
.values()
// TODO change once we allow missing_buckets
.forEach(src -> {
if (src.getField() != null) {
existsClauses.must(QueryBuilders.existsQuery(src.getField()));
}
});
config.getGroupConfig().getGroups().values().forEach(src -> {
if (src.getMissingBucket() == false && src.getField() != null) {
existsClauses.must(QueryBuilders.existsQuery(src.getField()));
}
});
return searchSourceBuilder.query(existsClauses).size(0).trackTotalHits(true);
}

View File

@ -102,7 +102,7 @@ public class CompositeBucketsChangeCollectorTests extends ESTestCase {
Map<String, SingleGroupSource> groups = new LinkedHashMap<>();
// a terms group_by is limited by terms query
SingleGroupSource termsGroupBy = new TermsGroupSource("id", null);
SingleGroupSource termsGroupBy = new TermsGroupSource("id", null, false);
groups.put("id", termsGroupBy);
ChangeCollector collector = CompositeBucketsChangeCollector.buildChangeCollector(getCompositeAggregation(groups), groups, null);