Fix `_field_caps` serialization in order to support cross cluster search (#24722)
Today the `_field_caps` API doesn't implement its request serialization correctly since indices and indices options are not serialized at all. This will likely break with all transport clients etc. and if this request must be send across the network. This commit fixes this and adds correct handling if we have only remote indices to prevent the inclusion of all local indices.
This commit is contained in:
parent
5fc6f17121
commit
cf846af0e5
|
@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* Used to keep track of original indices within internal (e.g. shard level) requests
|
||||
|
@ -64,4 +65,12 @@ public final class OriginalIndices implements IndicesRequest {
|
|||
out.writeStringArrayNullable(originalIndices.indices);
|
||||
originalIndices.indicesOptions.writeIndicesOptions(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "OriginalIndices{" +
|
||||
"indices=" + Arrays.toString(indices) +
|
||||
", indicesOptions=" + indicesOptions +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.ObjectParser.fromList;
|
||||
|
@ -78,6 +79,8 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
|
|||
super.readFrom(in);
|
||||
fields = in.readStringArray();
|
||||
if (in.getVersion().onOrAfter(Version.V_5_5_0_UNRELEASED)) {
|
||||
indices = in.readStringArray();
|
||||
indicesOptions = IndicesOptions.readIndicesOptions(in);
|
||||
mergeResults = in.readBoolean();
|
||||
} else {
|
||||
mergeResults = true;
|
||||
|
@ -89,6 +92,8 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
|
|||
super.writeTo(out);
|
||||
out.writeStringArray(fields);
|
||||
if (out.getVersion().onOrAfter(Version.V_5_5_0_UNRELEASED)) {
|
||||
out.writeStringArray(indices);
|
||||
indicesOptions.writeIndicesOptions(out);
|
||||
out.writeBoolean(mergeResults);
|
||||
}
|
||||
}
|
||||
|
@ -118,12 +123,12 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
|
|||
* The list of indices to lookup
|
||||
*/
|
||||
public FieldCapabilitiesRequest indices(String... indices) {
|
||||
this.indices = indices;
|
||||
this.indices = Objects.requireNonNull(indices, "indices must not be null");
|
||||
return this;
|
||||
}
|
||||
|
||||
public FieldCapabilitiesRequest indicesOptions(IndicesOptions indicesOptions) {
|
||||
this.indicesOptions = indicesOptions;
|
||||
this.indicesOptions = Objects.requireNonNull(indicesOptions, "indices options must not be null");
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.action.support.HandledTransportAction;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
|
@ -72,7 +73,14 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
|
|||
final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(request.indicesOptions(),
|
||||
request.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
|
||||
final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
|
||||
final String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices);
|
||||
final String[] concreteIndices;
|
||||
if (remoteClusterIndices.isEmpty() == false && localIndices.indices().length == 0) {
|
||||
// in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote
|
||||
// indices
|
||||
concreteIndices = Strings.EMPTY_ARRAY;
|
||||
} else {
|
||||
concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices);
|
||||
}
|
||||
final int totalNumRequest = concreteIndices.length + remoteClusterIndices.size();
|
||||
final CountDown completionCounter = new CountDown(totalNumRequest);
|
||||
final List<FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedList(new ArrayList<>());
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.fieldcaps;
|
||||
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -33,10 +34,52 @@ public class FieldCapabilitiesRequestTests extends ESTestCase {
|
|||
for (int i = 0; i < size; i++) {
|
||||
randomFields[i] = randomAlphaOfLengthBetween(5, 10);
|
||||
}
|
||||
|
||||
size = randomIntBetween(0, 20);
|
||||
String[] randomIndices = new String[size];
|
||||
for (int i = 0; i < size; i++) {
|
||||
randomIndices[i] = randomAlphaOfLengthBetween(5, 10);
|
||||
}
|
||||
request.fields(randomFields);
|
||||
request.indices(randomIndices);
|
||||
if (randomBoolean()) {
|
||||
request.indicesOptions(randomBoolean() ? IndicesOptions.strictExpand() : IndicesOptions.lenientExpandOpen());
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
public void testEqualsAndHashcode() {
|
||||
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
|
||||
request.indices("foo");
|
||||
request.indicesOptions(IndicesOptions.lenientExpandOpen());
|
||||
request.fields("bar");
|
||||
|
||||
FieldCapabilitiesRequest other = new FieldCapabilitiesRequest();
|
||||
other.indices("foo");
|
||||
other.indicesOptions(IndicesOptions.lenientExpandOpen());
|
||||
other.fields("bar");
|
||||
assertEquals(request, request);
|
||||
assertEquals(request, other);
|
||||
assertEquals(request.hashCode(), other.hashCode());
|
||||
|
||||
// change indices
|
||||
other.indices("foo", "bar");
|
||||
assertNotEquals(request, other);
|
||||
other.indices("foo");
|
||||
assertEquals(request, other);
|
||||
|
||||
// change fields
|
||||
other.fields("foo", "bar");
|
||||
assertNotEquals(request, other);
|
||||
other.fields("bar");
|
||||
assertEquals(request, request);
|
||||
|
||||
// change indices options
|
||||
other.indicesOptions(IndicesOptions.strictExpand());
|
||||
assertNotEquals(request, other);
|
||||
|
||||
}
|
||||
|
||||
public void testFieldCapsRequestSerialization() throws IOException {
|
||||
for (int i = 0; i < 20; i++) {
|
||||
FieldCapabilitiesRequest request = randomRequest();
|
||||
|
|
|
@ -64,3 +64,16 @@
|
|||
- match: {fields.geo.keyword.indices: ["my_remote_cluster:field_caps_index_3"]}
|
||||
- is_false: fields.geo.keyword.non_searchable_indices
|
||||
- is_false: fields.geo.keyword.on_aggregatable_indices
|
||||
|
||||
- do:
|
||||
field_caps:
|
||||
index: 'my_remote_cluster:some_index_that_doesnt_exist'
|
||||
fields: [number]
|
||||
- match: { 'fields': {} } # empty response - this index doesn't exists
|
||||
|
||||
- do:
|
||||
field_caps:
|
||||
index: 'my_remote_cluster:field_caps_index_1'
|
||||
fields: [number]
|
||||
- match: {fields.number.double.searchable: true}
|
||||
- match: {fields.number.double.aggregatable: true}
|
||||
|
|
Loading…
Reference in New Issue