[Transform] adds geotile_grid support in group_by (#56514) (#56549)

This adds support for grouping by geo points. This uses the agg [geotile_grid](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-geotilegrid-aggregation.html).

I am opting to store the tile results of group_by as a `geo_shape` so that users can query the results. Additionally, the shapes could be visualized and filtered in the kibana maps app.

relates to https://github.com/elastic/elasticsearch/issues/56121
This commit is contained in:
Benjamin Trent 2020-05-11 17:02:40 -04:00 committed by GitHub
parent 1337b35572
commit 1d6b2f074e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 664 additions and 16 deletions

View File

@ -0,0 +1,126 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transform.transforms.pivot;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.geo.GeoBoundingBox;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
/*
* A geotile_grid aggregation source for group_by
*/
public class GeoTileGroupSource extends SingleGroupSource implements ToXContentObject {
private static final String NAME = "transform_geo_tile_group";
private static final ParseField PRECISION = new ParseField("precision");
private static final ConstructingObjectParser<GeoTileGroupSource, Void> PARSER = new ConstructingObjectParser<>(NAME, true, (args) -> {
String field = (String) args[0];
Integer precision = (Integer) args[1];
GeoBoundingBox boundingBox = (GeoBoundingBox) args[2];
return new GeoTileGroupSource(field, precision, boundingBox);
});
static {
PARSER.declareString(optionalConstructorArg(), FIELD);
PARSER.declareInt(optionalConstructorArg(), PRECISION);
PARSER.declareField(
optionalConstructorArg(),
(p, context) -> GeoBoundingBox.parseBoundingBox(p),
GeoBoundingBox.BOUNDS_FIELD,
ObjectParser.ValueType.OBJECT
);
}
private final Integer precision;
private final GeoBoundingBox geoBoundingBox;
public GeoTileGroupSource(final String field, final Integer precision, final GeoBoundingBox boundingBox) {
super(field, null);
if (precision != null) {
GeoTileUtils.checkPrecisionRange(precision);
}
this.precision = precision;
this.geoBoundingBox = boundingBox;
}
@Override
public Type getType() {
return Type.GEOTILE_GRID;
}
public Integer getPrecision() {
return precision;
}
public GeoBoundingBox getGeoBoundingBox() {
return geoBoundingBox;
}
public static GeoTileGroupSource fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
super.innerXContent(builder, params);
if (precision != null) {
builder.field(PRECISION.getPreferredName(), precision);
}
if (geoBoundingBox != null) {
geoBoundingBox.toXContent(builder, params);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
final GeoTileGroupSource that = (GeoTileGroupSource) other;
return Objects.equals(this.field, that.field)
&& Objects.equals(this.precision, that.precision)
&& Objects.equals(this.geoBoundingBox, that.geoBoundingBox);
}
@Override
public int hashCode() {
return Objects.hash(field, precision, geoBoundingBox);
}
}

View File

@ -97,6 +97,9 @@ public class GroupConfig implements ToXContentObject {
case "date_histogram": case "date_histogram":
groupSource = DateHistogramGroupSource.fromXContent(parser); groupSource = DateHistogramGroupSource.fromXContent(parser);
break; break;
case "geotile_grid":
groupSource = GeoTileGroupSource.fromXContent(parser);
break;
default: default:
// not a valid group source. Consume up to the dest field end object // not a valid group source. Consume up to the dest field end object
consumeUntilEndObject(parser, 2); consumeUntilEndObject(parser, 2);

View File

@ -36,7 +36,8 @@ public abstract class SingleGroupSource implements ToXContentObject {
public enum Type { public enum Type {
TERMS, TERMS,
HISTOGRAM, HISTOGRAM,
DATE_HISTOGRAM; DATE_HISTOGRAM,
GEOTILE_GRID;
public String value() { public String value() {
return name().toLowerCase(Locale.ROOT); return name().toLowerCase(Locale.ROOT);

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transform.transforms.pivot;
import org.elasticsearch.common.geo.GeoBoundingBox;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.geo.GeometryTestUtils;
import org.elasticsearch.geometry.Rectangle;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.function.Predicate;
public class GeoTileGroupSourceTests extends AbstractXContentTestCase<GeoTileGroupSource> {
public static GeoTileGroupSource randomGeoTileGroupSource() {
Rectangle rectangle = GeometryTestUtils.randomRectangle();
return new GeoTileGroupSource(
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomIntBetween(1, GeoTileUtils.MAX_ZOOM),
randomBoolean() ? null : new GeoBoundingBox(
new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
)
);
}
@Override
protected GeoTileGroupSource createTestInstance() {
return randomGeoTileGroupSource();
}
@Override
protected GeoTileGroupSource doParseInstance(XContentParser parser) throws IOException {
return GeoTileGroupSource.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
// allow unknown fields in the root of the object only
return field -> !field.isEmpty();
}
}

View File

@ -55,8 +55,11 @@ public class GroupConfigTests extends AbstractXContentTestCase<GroupConfig> {
groupBy = HistogramGroupSourceTests.randomHistogramGroupSource(); groupBy = HistogramGroupSourceTests.randomHistogramGroupSource();
break; break;
case DATE_HISTOGRAM: case DATE_HISTOGRAM:
default:
groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource(); groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource();
break;
case GEOTILE_GRID:
default:
groupBy = GeoTileGroupSourceTests.randomGeoTileGroupSource();
} }
groups.put(targetFieldName, groupBy); groups.put(targetFieldName, groupBy);
} }

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transform.transforms.pivot.hlrc;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.common.geo.GeoBoundingBox;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.geo.GeometryTestUtils;
import org.elasticsearch.geometry.Rectangle;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
import org.elasticsearch.xpack.core.transform.transforms.pivot.GeoTileGroupSource;
import static org.hamcrest.Matchers.equalTo;
public class GeoTileGroupSourceTests extends AbstractResponseTestCase<
GeoTileGroupSource,
org.elasticsearch.client.transform.transforms.pivot.GeoTileGroupSource> {
public static GeoTileGroupSource randomGeoTileGroupSource() {
Rectangle rectangle = GeometryTestUtils.randomRectangle();
return new GeoTileGroupSource(
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomIntBetween(1, GeoTileUtils.MAX_ZOOM),
randomBoolean() ? null : new GeoBoundingBox(
new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
)
);
}
@Override
protected GeoTileGroupSource createServerTestInstance(XContentType xContentType) {
return randomGeoTileGroupSource();
}
@Override
protected org.elasticsearch.client.transform.transforms.pivot.GeoTileGroupSource doParseToClientInstance(XContentParser parser) {
return org.elasticsearch.client.transform.transforms.pivot.GeoTileGroupSource.fromXContent(parser);
}
@Override
protected void assertInstances(
GeoTileGroupSource serverTestInstance,
org.elasticsearch.client.transform.transforms.pivot.GeoTileGroupSource clientInstance
) {
assertThat(serverTestInstance.getField(), equalTo(clientInstance.getField()));
assertNull(clientInstance.getScript());
assertThat(serverTestInstance.getPrecision(), equalTo(clientInstance.getPrecision()));
assertThat(serverTestInstance.getGeoBoundingBox(), equalTo(clientInstance.getGeoBoundingBox()));
}
}

View File

@ -251,6 +251,14 @@ public final class GeoTileUtils {
return toBoundingBox(hashAsInts[1], hashAsInts[2], hashAsInts[0]); return toBoundingBox(hashAsInts[1], hashAsInts[2], hashAsInts[0]);
} }
/**
* Decode a string bucket key in "zoom/x/y" format to a bounding box of the tile corners
*/
public static Rectangle toBoundingBox(String hash) {
int[] hashAsInts = parseHash(hash);
return toBoundingBox(hashAsInts[1], hashAsInts[2], hashAsInts[0]);
}
public static Rectangle toBoundingBox(int xTile, int yTile, int precision) { public static Rectangle toBoundingBox(int xTile, int yTile, int precision) {
final double tiles = validateZXY(precision, xTile, yTile); final double tiles = validateZXY(precision, xTile, yTile);
final double minN = Math.PI - (2.0 * Math.PI * (yTile + 1)) / tiles; final double minN = Math.PI - (2.0 * Math.PI * (yTile + 1)) / tiles;

View File

@ -0,0 +1,193 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.transform.transforms.pivot;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.geo.GeoBoundingBox;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.geo.parsers.ShapeParser;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.geometry.Rectangle;
import org.elasticsearch.index.mapper.GeoShapeFieldMapper;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
/*
* A geotile_grid aggregation source for group_by
*/
public class GeoTileGroupSource extends SingleGroupSource {
private static final String NAME = "transform_geo_tile_group";
private static final ParseField PRECISION = new ParseField("precision");
private static final ConstructingObjectParser<GeoTileGroupSource, Void> STRICT_PARSER = createParser(false);
private static final ConstructingObjectParser<GeoTileGroupSource, Void> LENIENT_PARSER = createParser(true);
private static ConstructingObjectParser<GeoTileGroupSource, Void> createParser(boolean lenient) {
ConstructingObjectParser<GeoTileGroupSource, Void> parser = new ConstructingObjectParser<>(NAME, lenient, (args) -> {
String field = (String) args[0];
Integer precision = (Integer) args[1];
GeoBoundingBox boundingBox = (GeoBoundingBox) args[2];
return new GeoTileGroupSource(field, precision, boundingBox);
});
parser.declareString(optionalConstructorArg(), FIELD);
parser.declareInt(optionalConstructorArg(), PRECISION);
parser.declareField(
optionalConstructorArg(),
(p, context) -> GeoBoundingBox.parseBoundingBox(p),
GeoBoundingBox.BOUNDS_FIELD,
ObjectParser.ValueType.OBJECT
);
return parser;
}
private final Integer precision;
private final GeoBoundingBox geoBoundingBox;
public GeoTileGroupSource(final String field, final Integer precision, final GeoBoundingBox boundingBox) {
super(field, null);
if (precision != null) {
GeoTileUtils.checkPrecisionRange(precision);
}
this.precision = precision;
this.geoBoundingBox = boundingBox;
}
public GeoTileGroupSource(StreamInput in) throws IOException {
super(in);
precision = in.readOptionalVInt();
geoBoundingBox = in.readOptionalWriteable(GeoBoundingBox::new);
}
@Override
public Type getType() {
return Type.GEOTILE_GRID;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalVInt(precision);
out.writeOptionalWriteable(geoBoundingBox);
}
public Integer getPrecision() {
return precision;
}
public GeoBoundingBox getGeoBoundingBox() {
return geoBoundingBox;
}
public static GeoTileGroupSource fromXContent(final XContentParser parser, boolean lenient) {
return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null);
}
@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
if (changedBuckets != null && changedBuckets.isEmpty() == false) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
changedBuckets.stream()
.map(GeoTileUtils::toBoundingBox)
.map(this::toGeoQuery)
.forEach(boolQueryBuilder::should);
return boolQueryBuilder;
}
return null;
}
@Override
public boolean supportsIncrementalBucketUpdate() {
return true;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
super.innerXContent(builder, params);
if (precision != null) {
builder.field(PRECISION.getPreferredName(), precision);
}
if (geoBoundingBox != null) {
geoBoundingBox.toXContent(builder, params);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
final GeoTileGroupSource that = (GeoTileGroupSource) other;
return Objects.equals(this.field, that.field)
&& Objects.equals(this.precision, that.precision)
&& Objects.equals(this.geoBoundingBox, that.geoBoundingBox);
}
@Override
public int hashCode() {
return Objects.hash(field, precision, geoBoundingBox);
}
@Override
public String getMappingType() {
return GeoShapeFieldMapper.CONTENT_TYPE;
}
@Override
public Object transformBucketKey(Object key) {
assert key instanceof String;
Rectangle rectangle = GeoTileUtils.toBoundingBox(key.toString());
final Map<String, Object> geoShape = new HashMap<>();
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), "BBOX");
geoShape.put(
ShapeParser.FIELD_COORDINATES.getPreferredName(),
Arrays.asList(
new Double[] { rectangle.getMinLon(), rectangle.getMaxLat() },
new Double[] { rectangle.getMaxLon(), rectangle.getMinLat() }
)
);
return geoShape;
}
private GeoBoundingBoxQueryBuilder toGeoQuery(Rectangle rectangle) {
return QueryBuilders.geoBoundingBoxQuery(field)
.setCorners(
new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
);
}
}

View File

@ -60,6 +60,8 @@ public class GroupConfig implements Writeable, ToXContentObject {
return new HistogramGroupSource(stream); return new HistogramGroupSource(stream);
case DATE_HISTOGRAM: case DATE_HISTOGRAM:
return new DateHistogramGroupSource(stream); return new DateHistogramGroupSource(stream);
case GEOTILE_GRID:
return new GeoTileGroupSource(stream);
default: default:
throw new IOException("Unknown group type"); throw new IOException("Unknown group type");
} }
@ -177,6 +179,9 @@ public class GroupConfig implements Writeable, ToXContentObject {
case DATE_HISTOGRAM: case DATE_HISTOGRAM:
groupSource = DateHistogramGroupSource.fromXContent(parser, lenient); groupSource = DateHistogramGroupSource.fromXContent(parser, lenient);
break; break;
case GEOTILE_GRID:
groupSource = GeoTileGroupSource.fromXContent(parser, lenient);
break;
default: default:
throw new ParsingException(parser.getTokenLocation(), "invalid grouping type: " + groupType); throw new ParsingException(parser.getTokenLocation(), "invalid grouping type: " + groupType);
} }

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.core.transform.transforms.pivot; package org.elasticsearch.xpack.core.transform.transforms.pivot;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -32,7 +33,8 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
public enum Type { public enum Type {
TERMS(0), TERMS(0),
HISTOGRAM(1), HISTOGRAM(1),
DATE_HISTOGRAM(2); DATE_HISTOGRAM(2),
GEOTILE_GRID(3);
private final byte id; private final byte id;
@ -52,6 +54,8 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
return HISTOGRAM; return HISTOGRAM;
case 2: case 2:
return DATE_HISTOGRAM; return DATE_HISTOGRAM;
case 3:
return GEOTILE_GRID;
default: default:
throw new IllegalArgumentException("unknown type"); throw new IllegalArgumentException("unknown type");
} }
@ -154,4 +158,22 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
public String toString() { public String toString() {
return Strings.toString(this, true, true); return Strings.toString(this, true, true);
} }
/**
* @return The preferred mapping type if it exists. Is nullable.
*/
@Nullable
public String getMappingType() {
return null;
}
/**
* This will transform a composite aggregation bucket key into the desired format for indexing.
*
* @param key The bucket key for this group source
* @return the transformed bucket key for indexing
*/
public Object transformBucketKey(Object key) {
return key;
}
} }

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.transform.transforms.pivot;
import org.elasticsearch.common.geo.GeoBoundingBox;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.geo.GeometryTestUtils;
import org.elasticsearch.geometry.Rectangle;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
public class GeoTileGroupSourceTests extends AbstractSerializingTestCase<GeoTileGroupSource> {
public static GeoTileGroupSource randomGeoTileGroupSource() {
Rectangle rectangle = GeometryTestUtils.randomRectangle();
return new GeoTileGroupSource(
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomIntBetween(1, GeoTileUtils.MAX_ZOOM),
randomBoolean() ? null : new GeoBoundingBox(
new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
)
);
}
@Override
protected GeoTileGroupSource doParseInstance(XContentParser parser) throws IOException {
return GeoTileGroupSource.fromXContent(parser, false);
}
@Override
protected GeoTileGroupSource createTestInstance() {
return randomGeoTileGroupSource();
}
@Override
protected Reader<GeoTileGroupSource> instanceReader() {
return GeoTileGroupSource::new;
}
}

View File

@ -50,8 +50,11 @@ public class GroupConfigTests extends AbstractSerializingTestCase<GroupConfig> {
groupBy = HistogramGroupSourceTests.randomHistogramGroupSource(); groupBy = HistogramGroupSourceTests.randomHistogramGroupSource();
break; break;
case DATE_HISTOGRAM: case DATE_HISTOGRAM:
default:
groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource(); groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource();
break;
case GEOTILE_GRID:
default:
groupBy = GeoTileGroupSourceTests.randomGeoTileGroupSource();
} }
source.put(targetFieldName, Collections.singletonMap(type.value(), getSource(groupBy))); source.put(targetFieldName, Collections.singletonMap(type.value(), getSource(groupBy)));

View File

@ -25,10 +25,14 @@ import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
public class TransformPivotRestIT extends TransformRestTestCase { public class TransformPivotRestIT extends TransformRestTestCase {
@ -930,6 +934,70 @@ public class TransformPivotRestIT extends TransformRestTestCase {
assertEquals((4 + 15), Double.valueOf(latlon[1]), 0.000001); assertEquals((4 + 15), Double.valueOf(latlon[1]), 0.000001);
} }
@SuppressWarnings("unchecked")
public void testPivotWithGeotileGroupBy() throws Exception {
String transformId = "geotile_grid_group_by";
String transformIndex = "geotile_grid_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";
config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"tile\": {"
+ " \"geotile_grid\": {"
+ " \"field\": \"location\","
+ " \"precision\": 12"
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"boundary\": {"
+ " \"geo_bounds\": {\"field\": \"location\"}"
+ " } } }"
+ "}";
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
startAndWaitForTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
assertTrue(indexExists(transformIndex));
// we expect 27 documents as there are that many tiles at this zoom
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
// Verify that the format is sane for the geo grid
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?size=1");
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
assertThat(actual, is(not(nullValue())));
Map<String, Object> actualObj = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue(
"hits.hits._source.tile",
searchResult
)).get(0);
assertThat(actualObj.get("type"), equalTo("BBOX"));
List<List<Double>> coordinates = (List<List<Double>>) actualObj.get("coordinates");
assertThat(coordinates, is(not(nullValue())));
assertThat(coordinates, hasSize(2));
assertThat(coordinates.get(0), hasSize(2));
assertThat(coordinates.get(1), hasSize(2));
}
public void testPivotWithWeightedAvgAgg() throws Exception { public void testPivotWithWeightedAvgAgg() throws Exception {
String transformId = "weighted_avg_agg_transform"; String transformId = "weighted_avg_agg_transform";
String transformIndex = "weighted_avg_pivot_reviews"; String transformIndex = "weighted_avg_pivot_reviews";

View File

@ -81,10 +81,10 @@ public final class AggregationResultUtils {
// - update documents // - update documents
IDGenerator idGen = new IDGenerator(); IDGenerator idGen = new IDGenerator();
groups.getGroups().keySet().forEach(destinationFieldName -> { groups.getGroups().forEach((destinationFieldName, singleGroupSource) -> {
Object value = bucket.getKey().get(destinationFieldName); Object value = bucket.getKey().get(destinationFieldName);
idGen.add(destinationFieldName, value); idGen.add(destinationFieldName, value);
updateDocument(document, destinationFieldName, value); updateDocument(document, destinationFieldName, singleGroupSource.transformBucketKey(value));
}); });
List<String> aggNames = aggregationBuilders.stream().map(AggregationBuilder::getName).collect(Collectors.toList()); List<String> aggNames = aggregationBuilders.stream().map(AggregationBuilder::getName).collect(Collectors.toList());

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.transform.transforms.pivot;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
@ -15,6 +16,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
@ -68,10 +70,20 @@ public final class SchemaUtil {
Map<String, String> aggregationTypes = new HashMap<>(); Map<String, String> aggregationTypes = new HashMap<>();
// collects the fieldnames and target fieldnames used for grouping // collects the fieldnames and target fieldnames used for grouping
Map<String, String> fieldNamesForGrouping = new HashMap<>(); Map<String, String> fieldNamesForGrouping = new HashMap<>();
// collects the target mapping types used for grouping
Map<String, String> fieldTypesForGrouping = new HashMap<>();
config.getGroupConfig() config.getGroupConfig()
.getGroups() .getGroups()
.forEach((destinationFieldName, group) -> { fieldNamesForGrouping.put(destinationFieldName, group.getField()); }); .forEach((destinationFieldName, group) -> {
// We will always need the field name for the grouping to create the mapping
fieldNamesForGrouping.put(destinationFieldName, group.getField());
// Sometimes the group config will supply a desired mapping as well
if (group.getMappingType() != null) {
fieldTypesForGrouping.put(destinationFieldName, group.getMappingType());
}
});
for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) { for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) {
Tuple<Map<String, String>, Map<String, String>> inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(agg); Tuple<Map<String, String>, Map<String, String>> inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(agg);
@ -95,7 +107,13 @@ public final class SchemaUtil {
allFieldNames.values().toArray(new String[0]), allFieldNames.values().toArray(new String[0]),
ActionListener.wrap( ActionListener.wrap(
sourceMappings -> listener.onResponse( sourceMappings -> listener.onResponse(
resolveMappings(aggregationSourceFieldNames, aggregationTypes, fieldNamesForGrouping, sourceMappings) resolveMappings(
aggregationSourceFieldNames,
aggregationTypes,
fieldNamesForGrouping,
fieldTypesForGrouping,
sourceMappings
)
), ),
listener::onFailure listener::onFailure
) )
@ -131,6 +149,7 @@ public final class SchemaUtil {
Map<String, String> aggregationSourceFieldNames, Map<String, String> aggregationSourceFieldNames,
Map<String, String> aggregationTypes, Map<String, String> aggregationTypes,
Map<String, String> fieldNamesForGrouping, Map<String, String> fieldNamesForGrouping,
Map<String, String> fieldTypesForGrouping,
Map<String, String> sourceMappings Map<String, String> sourceMappings
) { ) {
Map<String, String> targetMapping = new HashMap<>(); Map<String, String> targetMapping = new HashMap<>();
@ -140,25 +159,34 @@ public final class SchemaUtil {
String sourceMapping = sourceFieldName == null ? null : sourceMappings.get(sourceFieldName); String sourceMapping = sourceFieldName == null ? null : sourceMappings.get(sourceFieldName);
String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping); String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping);
logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]", targetFieldName, aggregationName, destinationMapping); logger.debug(() -> new ParameterizedMessage(
"Deduced mapping for: [{}], agg type [{}] to [{}]",
targetFieldName,
aggregationName,
destinationMapping
));
if (Aggregations.isDynamicMapping(destinationMapping)) { if (Aggregations.isDynamicMapping(destinationMapping)) {
logger.debug("Dynamic target mapping set for field [{}] and aggregation [{}]", targetFieldName, aggregationName); logger.debug(() -> new ParameterizedMessage(
"Dynamic target mapping set for field [{}] and aggregation [{}]",
targetFieldName,
aggregationName
));
} else if (destinationMapping != null) { } else if (destinationMapping != null) {
targetMapping.put(targetFieldName, destinationMapping); targetMapping.put(targetFieldName, destinationMapping);
} else { } else {
logger.warn("Failed to deduce mapping for [" + targetFieldName + "], fall back to dynamic mapping."); logger.warn("Failed to deduce mapping for [{}], fall back to dynamic mapping.", targetFieldName);
} }
}); });
fieldNamesForGrouping.forEach((targetFieldName, sourceFieldName) -> { fieldNamesForGrouping.forEach((targetFieldName, sourceFieldName) -> {
String destinationMapping = sourceMappings.get(sourceFieldName); String destinationMapping = fieldTypesForGrouping.computeIfAbsent(targetFieldName, (s) -> sourceMappings.get(sourceFieldName));
logger.debug("Deduced mapping for: [{}] to [{}]", targetFieldName, destinationMapping); logger.debug(() -> new ParameterizedMessage("Deduced mapping for: [{}] to [{}]", targetFieldName, destinationMapping));
if (destinationMapping != null) { if (destinationMapping != null) {
targetMapping.put(targetFieldName, destinationMapping); targetMapping.put(targetFieldName, destinationMapping);
} else { } else {
logger.warn("Failed to deduce mapping for [" + targetFieldName + "], fall back to keyword."); logger.warn("Failed to deduce mapping for [{}], fall back to keyword.", targetFieldName);
targetMapping.put(targetFieldName, "keyword"); targetMapping.put(targetFieldName, KeywordFieldMapper.CONTENT_TYPE);
} }
}); });
@ -196,7 +224,7 @@ public final class SchemaUtil {
// TODO: overwrites types, requires resolve if // TODO: overwrites types, requires resolve if
// types are mixed // types are mixed
capabilitiesMap.forEach((name, capability) -> { capabilitiesMap.forEach((name, capability) -> {
logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType()); logger.trace(() -> new ParameterizedMessage("Extracted type for [{}] : [{}]", fieldName, capability.getType()));
extractedTypes.put(fieldName, capability.getType()); extractedTypes.put(fieldName, capability.getType());
}); });
} }