Allow index filtering in field capabilities API (#57276) (#58299)

This change allows to use an `index_filter` in the
field capabilities API. Indices are filtered from
the response if the provided query rewrites to `match_none`
on every shard:

````
GET metrics-*
{
  "index_filter": {
    "bool": {
      "must": [
        "range": {
          "@timestamp": {
            "gt": "2019"
          }
        }
      }
  }
}
````

The filtering is done on a best-effort basis, it uses the can match phase
to rewrite queries to `match_none` instead of fully executing the request.
The first shard that can match the filter is used to create the field
capabilities response for the entire index.

Closes #56195
This commit is contained in:
Jim Ferenczi 2020-06-18 10:23:26 +02:00 committed by GitHub
parent ffeff4090e
commit 82db0b575c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 725 additions and 117 deletions

View File

@ -539,13 +539,17 @@ final class RequestConverters {
return request;
}
static Request fieldCaps(FieldCapabilitiesRequest fieldCapabilitiesRequest) {
Request request = new Request(HttpGet.METHOD_NAME, endpoint(fieldCapabilitiesRequest.indices(), "_field_caps"));
static Request fieldCaps(FieldCapabilitiesRequest fieldCapabilitiesRequest) throws IOException {
String methodName = fieldCapabilitiesRequest.indexFilter() != null ? HttpPost.METHOD_NAME : HttpGet.METHOD_NAME;
Request request = new Request(methodName, endpoint(fieldCapabilitiesRequest.indices(), "_field_caps"));
Params params = new Params();
params.withFields(fieldCapabilitiesRequest.fields());
params.withIndicesOptions(fieldCapabilitiesRequest.indicesOptions());
request.addParameters(params.asMap());
if (fieldCapabilitiesRequest.indexFilter() != null) {
request.setEntity(createEntity(fieldCapabilitiesRequest, REQUEST_BODY_CONTENT_TYPE));
}
return request;
}

View File

@ -1661,7 +1661,7 @@ public class RequestConvertersTests extends ESTestCase {
assertToXContentBody(mtvRequest, request.getEntity());
}
public void testFieldCaps() {
public void testFieldCaps() throws IOException {
// Create a random request.
String[] indices = randomIndicesNames(0, 5);
String[] fields = generateRandomStringArray(5, 10, false, false);
@ -1699,6 +1699,48 @@ public class RequestConvertersTests extends ESTestCase {
assertNull(request.getEntity());
}
public void testFieldCapsWithIndexFilter() throws IOException {
// Create a random request.
String[] indices = randomIndicesNames(0, 5);
String[] fields = generateRandomStringArray(5, 10, false, false);
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
.indices(indices)
.fields(fields)
.indexFilter(QueryBuilders.matchAllQuery());
Map<String, String> indicesOptionsParams = new HashMap<>();
setRandomIndicesOptions(fieldCapabilitiesRequest::indicesOptions, fieldCapabilitiesRequest::indicesOptions, indicesOptionsParams);
Request request = RequestConverters.fieldCaps(fieldCapabilitiesRequest);
// Verify that the resulting REST request looks as expected.
StringJoiner endpoint = new StringJoiner("/", "/", "");
String joinedIndices = String.join(",", indices);
if (!joinedIndices.isEmpty()) {
endpoint.add(joinedIndices);
}
endpoint.add("_field_caps");
assertEquals(endpoint.toString(), request.getEndpoint());
assertEquals(5, request.getParameters().size());
// Note that we don't check the field param value explicitly, as field names are
// passed through
// a hash set before being added to the request, and can appear in a
// non-deterministic order.
assertThat(request.getParameters(), hasKey("fields"));
String[] requestFields = Strings.splitStringByCommaToArray(request.getParameters().get("fields"));
assertEquals(new HashSet<>(Arrays.asList(fields)), new HashSet<>(Arrays.asList(requestFields)));
for (Map.Entry<String, String> param : indicesOptionsParams.entrySet()) {
assertThat(request.getParameters(), hasEntry(param.getKey(), param.getValue()));
}
assertNotNull(request.getEntity());
assertToXContentBody(fieldCapabilitiesRequest, request.getEntity());
}
public void testRankEval() throws Exception {
RankEvalSpec spec = new RankEvalSpec(
Collections.singletonList(new RatedRequest("queryId", Collections.emptyList(), new SearchSourceBuilder())),

View File

@ -56,6 +56,12 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=index-ignore-unavailab
(Optional, boolean) If `true`, unmapped fields are included in the response.
Defaults to `false`.
[[search-field-caps-api-request-body]]
==== {api-request-body-title}
`index_filter`::
(Optional, <<query-dsl,query object>> Allows to filter indices if the provided
query rewrites to `match_none` on every shard.
[[search-field-caps-api-response-body]]
==== {api-response-body-title}
@ -202,3 +208,37 @@ in some indices but not all:
<1> The `rating` field is unmapped` in `index5`.
<2> The `title` field is unmapped` in `index5`.
It is also possible to filter indices with a query:
[source,console]
--------------------------------------------------
POST twitter*/_field_caps?fields=rating
{
"index_filter": {
"range": {
"@timestamp": {
"gte": "2018"
}
}
}
}
--------------------------------------------------
// TEST[setup:twitter]
In which case indices that rewrite the provided filter to `match_none` on every shard
will be filtered from the response.
--
[IMPORTANT]
====
The filtering is done on a best-effort basis, it uses index statistics and mappings
to rewrite queries to `match_none` instead of fully executing the request.
For instance a `range` query over a `date` field can rewrite to `match_none`
if all documents within a shard (including deleted documents) are outside
of the provided range.
However, not all queries can rewrite to `match_none` so this API may return
an index even if the provided filter matches no document.
====
--

View File

@ -30,15 +30,17 @@
- match: {indices.6.name: my_remote_cluster:closed_index}
- match: {indices.6.aliases.0: aliased_closed_index}
- match: {indices.6.attributes.0: closed}
- match: {indices.7.name: my_remote_cluster:field_caps_index_1}
- match: {indices.7.name: my_remote_cluster:field_caps_empty_index}
- match: {indices.7.attributes.0: open}
- match: {indices.8.name: my_remote_cluster:field_caps_index_3}
- match: {indices.8.name: my_remote_cluster:field_caps_index_1}
- match: {indices.8.attributes.0: open}
- match: {indices.9.name: my_remote_cluster:single_doc_index}
- match: {indices.9.name: my_remote_cluster:field_caps_index_3}
- match: {indices.9.attributes.0: open}
- match: {indices.10.name: my_remote_cluster:test_index}
- match: {indices.10.aliases.0: aliased_test_index}
- match: {indices.10.name: my_remote_cluster:single_doc_index}
- match: {indices.10.attributes.0: open}
- match: {indices.11.name: my_remote_cluster:test_index}
- match: {indices.11.aliases.0: aliased_test_index}
- match: {indices.11.attributes.0: open}
- match: {aliases.0.name: my_remote_cluster:aliased_closed_index}
- match: {aliases.0.indices.0: closed_index}
- match: {aliases.1.name: my_remote_cluster:aliased_test_index}

View File

@ -316,7 +316,7 @@
- do:
search:
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index,my_remote_cluster:field_caps_index_1
index: my_remote_cluster:aliased_test_index,my_remote_cluster:field_caps_empty_index
- is_false: num_reduce_phases
- match: {_clusters.total: 1}

View File

@ -72,3 +72,54 @@
fields: [number]
- match: {fields.number.double.searchable: true}
- match: {fields.number.double.aggregatable: true}
---
"Get field caps from remote cluster with index filter":
- skip:
version: " - 7.8.99"
reason: Index filter support was added in 7.9
- do:
indices.create:
index: field_caps_index_4
body:
mappings:
properties:
text:
type: text
keyword:
type: keyword
number:
type: double
geo:
type: geo_point
- do:
index:
index: field_caps_index_4
body: { created_at: "2017-01-02" }
- do:
indices.refresh:
index: [field_caps_index_4]
- do:
field_caps:
index: 'field_caps_index_4,my_remote_cluster:field_*'
fields: [number]
body: { index_filter: { range: { created_at: { lt: 2018 } } } }
- match: {indices: ["field_caps_index_4","my_remote_cluster:field_caps_index_1"]}
- length: {fields.number: 1}
- match: {fields.number.double.searchable: true}
- match: {fields.number.double.aggregatable: true}
- do:
field_caps:
index: 'field_caps_index_4,my_remote_cluster:field_*'
fields: [number]
body: { index_filter: { range: { created_at: { gt: 2019 } } } }
- match: {indices: ["my_remote_cluster:field_caps_index_3"]}
- length: {fields.number: 1}
- match: {fields.number.long.searchable: true}
- match: {fields.number.long.aggregatable: true}

View File

@ -50,7 +50,6 @@
body:
settings:
index:
number_of_shards: 1
number_of_replicas: 0
mappings:
properties:
@ -64,15 +63,18 @@
body:
- '{"index": {"_index": "single_doc_index"}}'
- '{"f1": "remote_cluster", "sort_field": 1, "created_at" : "2016-01-01"}'
- do:
indices.create:
index: field_caps_empty_index
- do:
indices.create:
index: field_caps_index_1
body:
settings:
index.number_of_shards: 1
mappings:
properties:
created_at:
type: date
text:
type: text
keyword:
@ -94,10 +96,10 @@
indices.create:
index: field_caps_index_3
body:
settings:
index.number_of_shards: 1
mappings:
properties:
created_at:
type: date
text:
type: text
keyword:
@ -158,6 +160,20 @@
- '{"index": {"_index": "test_index"}}'
- '{"f1": "remote_cluster", "animal": "chicken", "filter_field": 0}'
- do:
bulk:
refresh: true
body:
# Force all documents to be in the same shard (same routing)
- '{"index": {"_index": "field_caps_index_1", "routing": "foo"}}'
- '{"created_at": "2018-01-05"}'
- '{"index": {"_index": "field_caps_index_1", "routing": "foo"}}'
- '{"created_at": "2017-12-01"}'
- '{"index": {"_index": "field_caps_index_3"}}'
- '{"created_at": "2019-10-01"}'
- '{"index": {"_index": "field_caps_index_3"}}'
- '{"created_at": "2020-01-01"}'
- do:
search:
rest_total_hits_as_int: true

View File

@ -59,6 +59,9 @@
"default":false,
"description":"Indicates whether unmapped fields should be included in the response."
}
},
"body":{
"description":"An index filter specified with the Query DSL"
}
}
}

View File

@ -0,0 +1,124 @@
---
setup:
- do:
indices.create:
index: test-1
body:
mappings:
properties:
timestamp:
type: date
field1:
type: keyword
field2:
type: long
- do:
indices.create:
index: test-2
body:
mappings:
properties:
timestamp:
type: date
field1:
type: long
- do:
indices.create:
index: test-3
- do:
index:
index: test-1
body: { timestamp: "2015-01-02", "field1": "404" }
- do:
index:
index: test-1
body: { timestamp: "2018-10-02", "field1": "404" }
- do:
index:
index: test-2
body: { timestamp: "2019-10-04", "field1": "403" }
- do:
index:
index: test-2
body: { timestamp: "2020-03-04", "field1": "200" }
- do:
index:
index: test-3
body: { timestamp: "2022-01-01", "field1": "500" }
- do:
indices.refresh:
index: [test-1, test-2, test-3]
---
"Field caps with index filter":
- skip:
version: " - 7.8.99"
reason: Index filter support was added in 7.9
- do:
field_caps:
index: test-*
fields: "*"
- match: {indices: ["test-1", "test-2", "test-3"]}
- length: {fields.field1: 3}
- match: {fields.field1.long.searchable: true}
- match: {fields.field1.long.aggregatable: true}
- match: {fields.field1.keyword.searchable: true}
- match: {fields.field1.keyword.aggregatable: true}
- match: {fields.field1.text.searchable: true}
- match: {fields.field1.text.aggregatable: false}
- do:
field_caps:
index: test-*
fields: "*"
body: { index_filter: { range: { timestamp: { gte: 2010 }}}}
- match: {indices: ["test-1", "test-2", "test-3"]}
- length: {fields.field1: 3}
- do:
field_caps:
index: test-*
fields: "*"
body: { index_filter: { range: { timestamp: { gte: 2019 } } } }
- match: {indices: ["test-2", "test-3"]}
- length: {fields.field1: 2}
- match: {fields.field1.long.searchable: true}
- match: {fields.field1.long.aggregatable: true}
- match: {fields.field1.long.indices: ["test-2"]}
- match: {fields.field1.text.searchable: true}
- match: {fields.field1.text.aggregatable: false}
- match: {fields.field1.text.indices: ["test-3"]}
- is_false: fields.field1.indices
- do:
field_caps:
index: test-*
fields: "*"
body: { index_filter: { range: { timestamp: { lt: 2019 } } } }
- match: {indices: ["test-1"]}
- length: {fields.field1: 1}
- match: {fields.field1.keyword.searchable: true}
- match: {fields.field1.keyword.aggregatable: true}
- is_false: fields.field1.indices
- do:
field_caps:
index: test-*
fields: "*"
body: { index_filter: { match_none: {} } }
- match: {indices: []}
- length: {fields: 0}

View File

@ -21,16 +21,20 @@ package org.elasticsearch.search.fieldcaps;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
@ -205,6 +209,43 @@ public class FieldCapabilitiesIT extends ESIntegTestCase {
assertEquals(response1, response2);
}
public void testWithIndexFilter() throws InterruptedException {
assertAcked(prepareCreate("index-1").addMapping("_doc", "timestamp", "type=date", "field1", "type=keyword"));
assertAcked(prepareCreate("index-2").addMapping("_doc", "timestamp", "type=date", "field1", "type=long"));
List<IndexRequestBuilder> reqs = new ArrayList<>();
reqs.add(client().prepareIndex("index-1", "_doc").setSource("timestamp", "2015-07-08"));
reqs.add(client().prepareIndex("index-1", "_doc").setSource("timestamp", "2018-07-08"));
reqs.add(client().prepareIndex("index-2", "_doc").setSource("timestamp", "2019-10-12"));
reqs.add(client().prepareIndex("index-2", "_doc").setSource("timestamp", "2020-07-08"));
indexRandom(true, reqs);
FieldCapabilitiesResponse response = client().prepareFieldCaps("index-*").setFields("*").get();
assertIndices(response, "index-1", "index-2");
Map<String, FieldCapabilities> newField = response.getField("field1");
assertEquals(2, newField.size());
assertTrue(newField.containsKey("long"));
assertTrue(newField.containsKey("keyword"));
response = client().prepareFieldCaps("index-*")
.setFields("*")
.setIndexFilter(QueryBuilders.rangeQuery("timestamp").gte("2019-11-01"))
.get();
assertIndices(response, "index-2");
newField = response.getField("field1");
assertEquals(1, newField.size());
assertTrue(newField.containsKey("long"));
response = client().prepareFieldCaps("index-*")
.setFields("*")
.setIndexFilter(QueryBuilders.rangeQuery("timestamp").lte("2017-01-01"))
.get();
assertIndices(response, "index-1");
newField = response.getField("field1");
assertEquals(1, newField.size());
assertTrue(newField.containsKey("keyword"));
}
private void assertIndices(FieldCapabilitiesResponse response, String... indices) {
assertNotNull(response.getIndices());
Arrays.sort(indices);

View File

@ -20,40 +20,59 @@
package org.elasticsearch.action.fieldcaps;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Objects;
public class FieldCapabilitiesIndexRequest extends SingleShardRequest<FieldCapabilitiesIndexRequest> {
public class FieldCapabilitiesIndexRequest extends ActionRequest implements IndicesRequest {
public static final IndicesOptions INDICES_OPTIONS = IndicesOptions.strictSingleIndexNoExpandForbidClosed();
private final String index;
private final String[] fields;
private final OriginalIndices originalIndices;
private final QueryBuilder indexFilter;
private final long nowInMillis;
private ShardId shardId;
// For serialization
FieldCapabilitiesIndexRequest(StreamInput in) throws IOException {
super(in);
shardId = in.readOptionalWriteable(ShardId::new);
index = in.readOptionalString();
fields = in.readStringArray();
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
originalIndices = OriginalIndices.readOriginalIndices(in);
} else {
originalIndices = OriginalIndices.NONE;
}
indexFilter = in.getVersion().onOrAfter(Version.V_7_9_0) ? in.readOptionalNamedWriteable(QueryBuilder.class) : null;
nowInMillis = in.getVersion().onOrAfter(Version.V_7_9_0) ? in.readLong() : 0L;
}
FieldCapabilitiesIndexRequest(String[] fields, String index, OriginalIndices originalIndices) {
super(index);
FieldCapabilitiesIndexRequest(String[] fields,
String index,
OriginalIndices originalIndices,
QueryBuilder indexFilter,
long nowInMillis) {
if (fields == null || fields.length == 0) {
throw new IllegalArgumentException("specified fields can't be null or empty");
}
this.index = Objects.requireNonNull(index);
this.fields = fields;
assert index != null;
this.index(index);
this.originalIndices = originalIndices;
this.indexFilter = indexFilter;
this.nowInMillis = nowInMillis;
}
public String[] fields() {
@ -70,13 +89,40 @@ public class FieldCapabilitiesIndexRequest extends SingleShardRequest<FieldCapab
return originalIndices.indicesOptions();
}
public String index() {
return index;
}
public QueryBuilder indexFilter() {
return indexFilter;
}
public ShardId shardId() {
return shardId;
}
public long nowInMillis() {
return nowInMillis;
}
FieldCapabilitiesIndexRequest shardId(ShardId shardId) {
this.shardId = shardId;
return this;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(shardId);
out.writeOptionalString(index);
out.writeStringArray(fields);
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeOptionalNamedWriteable(indexFilter);
out.writeLong(nowInMillis);
}
}
@Override

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.fieldcaps;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -32,19 +33,21 @@ import java.util.Objects;
* Response for {@link TransportFieldCapabilitiesIndexAction}.
*/
public class FieldCapabilitiesIndexResponse extends ActionResponse implements Writeable {
private String indexName;
private Map<String, IndexFieldCapabilities> responseMap;
private final String indexName;
private final Map<String, IndexFieldCapabilities> responseMap;
private final boolean canMatch;
FieldCapabilitiesIndexResponse(String indexName, Map<String, IndexFieldCapabilities> responseMap) {
FieldCapabilitiesIndexResponse(String indexName, Map<String, IndexFieldCapabilities> responseMap, boolean canMatch) {
this.indexName = indexName;
this.responseMap = responseMap;
this.canMatch = canMatch;
}
FieldCapabilitiesIndexResponse(StreamInput in) throws IOException {
super(in);
this.indexName = in.readString();
this.responseMap =
in.readMap(StreamInput::readString, IndexFieldCapabilities::new);
this.responseMap = in.readMap(StreamInput::readString, IndexFieldCapabilities::new);
this.canMatch = in.getVersion().onOrAfter(Version.V_7_9_0) ? in.readBoolean() : true;
}
/**
@ -54,6 +57,10 @@ public class FieldCapabilitiesIndexResponse extends ActionResponse implements Wr
return indexName;
}
public boolean canMatch() {
return canMatch;
}
/**
* Get the field capabilities map
*/
@ -72,8 +79,10 @@ public class FieldCapabilitiesIndexResponse extends ActionResponse implements Wr
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexName);
out.writeMap(responseMap,
StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeBoolean(canMatch);
}
}
@Override
@ -81,12 +90,13 @@ public class FieldCapabilitiesIndexResponse extends ActionResponse implements Wr
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FieldCapabilitiesIndexResponse that = (FieldCapabilitiesIndexResponse) o;
return Objects.equals(indexName, that.indexName) &&
return canMatch == that.canMatch &&
Objects.equals(indexName, that.indexName) &&
Objects.equals(responseMap, that.responseMap);
}
@Override
public int hashCode() {
return Objects.hash(indexName, responseMap);
return Objects.hash(indexName, responseMap, canMatch);
}
}

View File

@ -25,11 +25,12 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import java.io.IOException;
import java.util.Arrays;
@ -37,24 +38,17 @@ import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import static org.elasticsearch.common.xcontent.ObjectParser.fromList;
public final class FieldCapabilitiesRequest extends ActionRequest implements IndicesRequest.Replaceable {
public static final ParseField FIELDS_FIELD = new ParseField("fields");
public final class FieldCapabilitiesRequest extends ActionRequest implements IndicesRequest.Replaceable, ToXContentObject {
public static final String NAME = "field_caps_request";
private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
private String[] fields = Strings.EMPTY_ARRAY;
private boolean includeUnmapped = false;
// pkg private API mainly for cross cluster search to signal that we do multiple reductions ie. the results should not be merged
private boolean mergeResults = true;
private static final ObjectParser<FieldCapabilitiesRequest, Void> PARSER =
new ObjectParser<>(NAME, FieldCapabilitiesRequest::new);
static {
PARSER.declareStringArray(fromList(String.class, FieldCapabilitiesRequest::fields), FIELDS_FIELD);
}
private QueryBuilder indexFilter;
private Long nowInMillis;
public FieldCapabilitiesRequest(StreamInput in) throws IOException {
super(in);
@ -67,13 +61,16 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
} else {
includeUnmapped = false;
}
indexFilter = in.getVersion().onOrAfter(Version.V_7_9_0) ? in.readOptionalNamedWriteable(QueryBuilder.class) : null;
nowInMillis = in.getVersion().onOrAfter(Version.V_7_9_0) ? in.readOptionalLong() : null;
}
public FieldCapabilitiesRequest() {}
public FieldCapabilitiesRequest() {
}
/**
* Returns <code>true</code> iff the results should be merged.
*
* <p>
* Note that when using the high-level REST client, results are always merged (this flag is always considered 'true').
*/
boolean isMergeResults() {
@ -83,7 +80,7 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
/**
* If set to <code>true</code> the response will contain only a merged view of the per index field capabilities.
* Otherwise only unmerged per index field capabilities are returned.
*
* <p>
* Note that when using the high-level REST client, results are always merged (this flag is always considered 'true').
*/
void setMergeResults(boolean mergeResults) {
@ -100,6 +97,20 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
if (out.getVersion().onOrAfter(Version.V_7_2_0)) {
out.writeBoolean(includeUnmapped);
}
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeOptionalNamedWriteable(indexFilter);
out.writeOptionalLong(nowInMillis);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (indexFilter != null) {
builder.field("index_filter", indexFilter);
}
builder.endObject();
return builder;
}
/**
@ -150,6 +161,26 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
return includeUnmapped;
}
/**
* Allows to filter indices if the provided {@link QueryBuilder} rewrites to `match_none` on every shard.
*/
public FieldCapabilitiesRequest indexFilter(QueryBuilder indexFilter) {
this.indexFilter = indexFilter;
return this;
}
public QueryBuilder indexFilter() {
return indexFilter;
}
Long nowInMillis() {
return nowInMillis;
}
void nowInMillis(long nowInMillis) {
this.nowInMillis = nowInMillis;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
@ -163,17 +194,21 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FieldCapabilitiesRequest that = (FieldCapabilitiesRequest) o;
return Arrays.equals(indices, that.indices) &&
Objects.equals(indicesOptions, that.indicesOptions) &&
return includeUnmapped == that.includeUnmapped &&
mergeResults == that.mergeResults &&
Arrays.equals(indices, that.indices) &&
indicesOptions.equals(that.indicesOptions) &&
Arrays.equals(fields, that.fields) &&
Objects.equals(mergeResults, that.mergeResults) &&
includeUnmapped == that.includeUnmapped;
Objects.equals(indexFilter, that.indexFilter) &&
Objects.equals(nowInMillis, that.nowInMillis);
}
@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(indices), indicesOptions, Arrays.hashCode(fields), mergeResults, includeUnmapped);
int result = Objects.hash(indicesOptions, includeUnmapped, mergeResults, indexFilter, nowInMillis);
result = 31 * result + Arrays.hashCode(indices);
result = 31 * result + Arrays.hashCode(fields);
return result;
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.fieldcaps;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.index.query.QueryBuilder;
public class FieldCapabilitiesRequestBuilder extends ActionRequestBuilder<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
public FieldCapabilitiesRequestBuilder(ElasticsearchClient client,
@ -41,4 +42,9 @@ public class FieldCapabilitiesRequestBuilder extends ActionRequestBuilder<FieldC
request().includeUnmapped(includeUnmapped);
return this;
}
public FieldCapabilitiesRequestBuilder setIndexFilter(QueryBuilder indexFilter) {
request().indexFilter(indexFilter);
return this;
}
}

View File

@ -67,6 +67,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
@Override
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
// retrieve the initial timestamp in case the action is a cross cluster search
long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis();
final ClusterState clusterState = clusterService.state();
final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(request.indicesOptions(),
request.indices(), idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState));
@ -78,26 +80,27 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
} else {
concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices, true);
}
final String[] allIndices = mergeIndiceNames(concreteIndices, remoteClusterIndices);
final int totalNumRequest = concreteIndices.length + remoteClusterIndices.size();
final CountDown completionCounter = new CountDown(totalNumRequest);
final List<FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedList(new ArrayList<>());
final Runnable onResponse = () -> {
if (completionCounter.countDown()) {
if (request.isMergeResults()) {
listener.onResponse(merge(allIndices, indexResponses, request.includeUnmapped()));
listener.onResponse(merge(indexResponses, request.includeUnmapped()));
} else {
listener.onResponse(new FieldCapabilitiesResponse(indexResponses));
}
}
};
if (totalNumRequest == 0) {
listener.onResponse(new FieldCapabilitiesResponse(allIndices, Collections.emptyMap()));
listener.onResponse(new FieldCapabilitiesResponse(new String[0], Collections.emptyMap()));
} else {
ActionListener<FieldCapabilitiesIndexResponse> innerListener = new ActionListener<FieldCapabilitiesIndexResponse>() {
@Override
public void onResponse(FieldCapabilitiesIndexResponse result) {
if (result.canMatch()) {
indexResponses.add(result);
}
onResponse.run();
}
@ -108,7 +111,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
}
};
for (String index : concreteIndices) {
shardAction.execute(new FieldCapabilitiesIndexRequest(request.fields(), index, localIndices), innerListener);
shardAction.execute(new FieldCapabilitiesIndexRequest(request.fields(), index, localIndices,
request.indexFilter(), nowInMillis), innerListener);
}
// this is the cross cluster part of this API - we force the other cluster to not merge the results but instead
@ -122,10 +126,12 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
remoteRequest.indexFilter(request.indexFilter());
remoteRequest.nowInMillis(nowInMillis);
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.
buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get()));
buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get(), res.canMatch()));
}
onResponse.run();
}, failure -> onResponse.run()));
@ -133,19 +139,11 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
}
}
private String[] mergeIndiceNames(String[] localIndices, Map<String, OriginalIndices> remoteIndices) {
Set<String> allIndices = new HashSet<>();
Arrays.stream(localIndices).forEach(allIndices::add);
for (Map.Entry<String, OriginalIndices> entry : remoteIndices.entrySet()) {
for (String index : entry.getValue().indices()) {
allIndices.add(RemoteClusterAware.buildRemoteIndexName(entry.getKey(), index));
}
}
return allIndices.stream().toArray(String[]::new);
}
private FieldCapabilitiesResponse merge(String[] indices, List<FieldCapabilitiesIndexResponse> indexResponses,
boolean includeUnmapped) {
private FieldCapabilitiesResponse merge(List<FieldCapabilitiesIndexResponse> indexResponses, boolean includeUnmapped) {
String[] indices = indexResponses.stream()
.map(FieldCapabilitiesIndexResponse::getIndexName)
.sorted()
.toArray(String[]::new);
final Map<String, Map<String, FieldCapabilities.Builder>> responseMapBuilder = new HashMap<> ();
for (FieldCapabilitiesIndexResponse response : indexResponses) {
innerMerge(responseMapBuilder, response.getIndexName(), response.get());

View File

@ -19,62 +19,99 @@
package org.elasticsearch.action.fieldcaps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Predicate;
public class TransportFieldCapabilitiesIndexAction extends TransportSingleShardAction<FieldCapabilitiesIndexRequest,
FieldCapabilitiesIndexResponse> {
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
public class TransportFieldCapabilitiesIndexAction
extends HandledTransportAction<FieldCapabilitiesIndexRequest, FieldCapabilitiesIndexResponse> {
private static final Logger logger = LogManager.getLogger(TransportFieldCapabilitiesIndexAction.class);
private static final String ACTION_NAME = FieldCapabilitiesAction.NAME + "[index]";
private static final String ACTION_SHARD_NAME = ACTION_NAME + "[s]";
public static final ActionType<FieldCapabilitiesIndexResponse> TYPE =
new ActionType<>(ACTION_NAME, FieldCapabilitiesIndexResponse::new);
private final ClusterService clusterService;
private final TransportService transportService;
private final SearchService searchService;
private final IndicesService indicesService;
private final Executor executor;
@Inject
public TransportFieldCapabilitiesIndexAction(ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(ACTION_NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
FieldCapabilitiesIndexRequest::new, ThreadPool.Names.MANAGEMENT);
IndicesService indicesService, SearchService searchService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(ACTION_NAME, transportService, actionFilters, FieldCapabilitiesIndexRequest::new);
this.clusterService = clusterService;
this.transportService = transportService;
this.searchService = searchService;
this.indicesService = indicesService;
this.executor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
transportService.registerRequestHandler(ACTION_SHARD_NAME, ThreadPool.Names.SAME,
FieldCapabilitiesIndexRequest::new, new ShardTransportHandler());
}
@Override
protected boolean resolveIndex(FieldCapabilitiesIndexRequest request) {
//internal action, index already resolved
return false;
protected void doExecute(Task task, FieldCapabilitiesIndexRequest request, ActionListener<FieldCapabilitiesIndexResponse> listener) {
new AsyncShardsAction(request, listener).start();
}
@Override
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
// Will balance requests between shards
// Resolve patterns and deduplicate
return state.routingTable().index(request.concreteIndex()).randomAllActiveShardsIt();
private FieldCapabilitiesIndexResponse shardOperation(final FieldCapabilitiesIndexRequest request) throws IOException {
if (canMatchShard(request) == false) {
return new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false);
}
@Override
protected FieldCapabilitiesIndexResponse shardOperation(final FieldCapabilitiesIndexRequest request, ShardId shardId) {
ShardId shardId = request.shardId();
MapperService mapperService = indicesService.indexServiceSafe(shardId.getIndex()).mapperService();
Set<String> fieldNames = new HashSet<>();
for (String field : request.fields()) {
@ -114,16 +151,160 @@ public class TransportFieldCapabilitiesIndexAction extends TransportSingleShardA
}
}
}
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), responseMap);
return new FieldCapabilitiesIndexResponse(request.index(), responseMap, true);
}
private boolean canMatchShard(FieldCapabilitiesIndexRequest req) throws IOException {
if (req.indexFilter() == null || req.indexFilter() instanceof MatchAllQueryBuilder) {
return true;
}
assert req.nowInMillis() != 0L;
ShardSearchRequest searchRequest = new ShardSearchRequest(req.shardId(), null, req.nowInMillis(), AliasFilter.EMPTY);
searchRequest.source(new SearchSourceBuilder().query(req.indexFilter()));
return searchService.canMatch(searchRequest).canMatch();
}
private ClusterBlockException checkGlobalBlock(ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
private ClusterBlockException checkRequestBlock(ClusterState state, String concreteIndex) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, concreteIndex);
}
/**
* An action that executes on each shard sequentially until it finds one that can match the provided
* {@link FieldCapabilitiesIndexRequest#indexFilter()}. In which case the shard is used
* to create the final {@link FieldCapabilitiesIndexResponse}.
*/
class AsyncShardsAction {
private final FieldCapabilitiesIndexRequest request;
private final DiscoveryNodes nodes;
private final ActionListener<FieldCapabilitiesIndexResponse> listener;
private final GroupShardsIterator<ShardIterator> shardsIt;
private volatile int shardIndex = 0;
private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener<FieldCapabilitiesIndexResponse> listener) {
this.listener = listener;
ClusterState clusterState = clusterService.state();
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
}
nodes = clusterState.nodes();
ClusterBlockException blockException = checkGlobalBlock(clusterState);
if (blockException != null) {
throw blockException;
}
this.request = request;
blockException = checkRequestBlock(clusterState, request.index());
if (blockException != null) {
throw blockException;
}
shardsIt = clusterService.operationRouting().searchShards(clusterService.state(),
new String[]{request.index()}, null, null, null, null);
}
public void start() {
tryNext(null, true);
}
private void onFailure(ShardRouting shardRouting, Exception e) {
if (e != null) {
logger.trace(() -> new ParameterizedMessage("{}: failed to execute [{}]", shardRouting, request), e);
}
tryNext(e, false);
}
private ShardRouting nextRoutingOrNull() {
if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) {
return null;
}
ShardRouting next = shardsIt.get(shardIndex).nextOrNull();
if (next != null) {
return next;
}
moveToNextShard();
return nextRoutingOrNull();
}
private void moveToNextShard() {
++ shardIndex;
}
private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) {
ShardRouting shardRouting = nextRoutingOrNull();
if (shardRouting == null) {
if (canMatchShard == false) {
listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false));
} else {
if (lastFailure == null || isShardNotAvailableException(lastFailure)) {
listener.onFailure(new NoShardAvailableActionException(null,
LoggerMessageFormat.format("No shard available for [{}]", request), lastFailure));
} else {
logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null, request), lastFailure);
listener.onFailure(lastFailure);
}
}
return;
}
DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
request.shardId(shardRouting.shardId());
if (logger.isTraceEnabled()) {
logger.trace(
"sending request [{}] on node [{}]",
request,
node
);
}
transportService.sendRequest(node, ACTION_SHARD_NAME, request,
new TransportResponseHandler<FieldCapabilitiesIndexResponse>() {
@Override
public FieldCapabilitiesIndexResponse read(StreamInput in) throws IOException {
return new FieldCapabilitiesIndexResponse(in);
}
@Override
protected Writeable.Reader<FieldCapabilitiesIndexResponse> getResponseReader() {
return FieldCapabilitiesIndexResponse::new;
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_READ, request.concreteIndex());
public void handleResponse(final FieldCapabilitiesIndexResponse response) {
if (response.canMatch()) {
listener.onResponse(response);
} else {
moveToNextShard();
tryNext(null, false);
}
}
@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
}
});
}
}
}
private class ShardTransportHandler implements TransportRequestHandler<FieldCapabilitiesIndexRequest> {
@Override
public void messageReceived(final FieldCapabilitiesIndexRequest request,
final TransportChannel channel,
Task task) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("executing [{}]", request);
}
ActionListener<FieldCapabilitiesIndexResponse> listener = new ChannelActionListener<>(channel, ACTION_SHARD_NAME, request);
executor.execute(ActionRunnable.supply(listener, () -> shardOperation(request)));
}
}
}

View File

@ -205,7 +205,11 @@ public class RestActions {
}
public static QueryBuilder getQueryContent(XContentParser requestParser) {
return parseTopLevelQueryBuilder(requestParser);
return parseTopLevelQueryBuilder("query", requestParser);
}
public static QueryBuilder getQueryContent(String fieldName, XContentParser requestParser) {
return parseTopLevelQueryBuilder(fieldName, requestParser);
}
/**
@ -238,7 +242,7 @@ public class RestActions {
/**
* Parses a top level query including the query element that wraps it
*/
private static QueryBuilder parseTopLevelQueryBuilder(XContentParser parser) {
private static QueryBuilder parseTopLevelQueryBuilder(String fieldName, XContentParser parser) {
try {
QueryBuilder queryBuilder = null;
XContentParser.Token first = parser.nextToken();
@ -252,8 +256,8 @@ public class RestActions {
}
for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) {
if (token == XContentParser.Token.FIELD_NAME) {
String fieldName = parser.currentName();
if ("query".equals(fieldName)) {
String currentName = parser.currentName();
if (fieldName.equals(currentName)) {
queryBuilder = parseInnerQueryBuilder(parser);
} else {
throw new ParsingException(parser.getTokenLocation(), "request does not support [" + parser.currentName() + "]");

View File

@ -61,6 +61,11 @@ public class RestFieldCapabilitiesAction extends BaseRestHandler {
fieldRequest.indicesOptions(
IndicesOptions.fromRequest(request, fieldRequest.indicesOptions()));
fieldRequest.includeUnmapped(request.paramAsBoolean("include_unmapped", false));
request.withContentOrSourceParamParserOrNull(parser -> {
if (parser != null) {
fieldRequest.indexFilter(RestActions.getQueryContent("index_filter", parser));
}
});
return channel -> client.fieldCaps(fieldRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -122,8 +122,8 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
String[] types,
long nowInMillis,
AliasFilter aliasFilter) {
this(OriginalIndices.NONE, shardId, -1, null, null, types, null,
aliasFilter, 1.0f, false, Strings.EMPTY_ARRAY, null, null, nowInMillis, null);
this(OriginalIndices.NONE, shardId, -1, SearchType.QUERY_THEN_FETCH, null, types,
null, aliasFilter, 1.0f, false, Strings.EMPTY_ARRAY, null, null, nowInMillis, null);
}
private ShardSearchRequest(OriginalIndices originalIndices,

View File

@ -57,7 +57,7 @@ public class FieldCapabilitiesResponseTests extends AbstractWireSerializingTestC
for (String field : fields) {
responses.put(field, randomFieldCaps(field));
}
return new FieldCapabilitiesIndexResponse(randomAsciiLettersOfLength(10), responses);
return new FieldCapabilitiesIndexResponse(randomAsciiLettersOfLength(10), responses, randomBoolean());
}
private static IndexFieldCapabilities randomFieldCaps(String fieldName) {