Add raw sort values to SearchSortValues transport serialization (#36617)
In order for CCS alternate execution mode (see #32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of each `SearchHit`. Sort values are already present but they are formatted according to the provided `DocValueFormat` provided. The CCS node needs to be able to reconstruct the lucene `FieldDoc` to include in the `TopFieldDocs` and `CollapseTopFieldDocs` which will feed the `mergeTopDocs` method used to reduce multiple search responses (one per cluster) into one. This commit adds such information to the `SearchSortValues` and exposes it through a new getter method added to `SearchHit` for retrieval. This info is only serialized at transport and never printed out at REST.
This commit is contained in:
parent
7b9ca62174
commit
b57e12aa44
|
@ -19,16 +19,6 @@
|
|||
|
||||
package org.elasticsearch.search;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.lucene.search.Explanation;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.action.OriginalIndices;
|
||||
|
@ -61,6 +51,16 @@ import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
|
|||
import org.elasticsearch.search.lookup.SourceLookup;
|
||||
import org.elasticsearch.transport.RemoteClusterAware;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
|
@ -311,10 +311,17 @@ public final class SearchHit implements Streamable, ToXContentObject, Iterable<D
|
|||
}
|
||||
|
||||
/**
|
||||
* An array of the sort values used.
|
||||
* An array of the (formatted) sort values used.
|
||||
*/
|
||||
public Object[] getSortValues() {
|
||||
return sortValues.sortValues();
|
||||
return sortValues.getFormattedSortValues();
|
||||
}
|
||||
|
||||
/**
|
||||
* An array of the (raw) sort values used.
|
||||
*/
|
||||
public Object[] getRawSortValues() {
|
||||
return sortValues.getRawSortValues();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,9 +20,11 @@
|
|||
package org.elasticsearch.search;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.xcontent.ToXContentFragment;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
@ -35,101 +37,56 @@ import java.util.Objects;
|
|||
|
||||
public class SearchSortValues implements ToXContentFragment, Writeable {
|
||||
|
||||
static final SearchSortValues EMPTY = new SearchSortValues(new Object[0]);
|
||||
private final Object[] sortValues;
|
||||
private static final Object[] EMPTY_ARRAY = new Object[0];
|
||||
static final SearchSortValues EMPTY = new SearchSortValues(EMPTY_ARRAY);
|
||||
|
||||
private final Object[] formattedSortValues;
|
||||
private final Object[] rawSortValues;
|
||||
|
||||
SearchSortValues(Object[] sortValues) {
|
||||
this.sortValues = Objects.requireNonNull(sortValues, "sort values must not be empty");
|
||||
this.formattedSortValues = Objects.requireNonNull(sortValues, "sort values must not be empty");
|
||||
this.rawSortValues = EMPTY_ARRAY;
|
||||
}
|
||||
|
||||
public SearchSortValues(Object[] sortValues, DocValueFormat[] sortValueFormats) {
|
||||
Objects.requireNonNull(sortValues);
|
||||
public SearchSortValues(Object[] rawSortValues, DocValueFormat[] sortValueFormats) {
|
||||
Objects.requireNonNull(rawSortValues);
|
||||
Objects.requireNonNull(sortValueFormats);
|
||||
this.sortValues = Arrays.copyOf(sortValues, sortValues.length);
|
||||
for (int i = 0; i < sortValues.length; ++i) {
|
||||
if (this.sortValues[i] instanceof BytesRef) {
|
||||
this.sortValues[i] = sortValueFormats[i].format((BytesRef) sortValues[i]);
|
||||
if (rawSortValues.length != sortValueFormats.length) {
|
||||
throw new IllegalArgumentException("formattedSortValues and sortValueFormats must hold the same number of items");
|
||||
}
|
||||
this.rawSortValues = rawSortValues;
|
||||
this.formattedSortValues = Arrays.copyOf(rawSortValues, rawSortValues.length);
|
||||
for (int i = 0; i < rawSortValues.length; ++i) {
|
||||
//we currently format only BytesRef but we may want to change that in the future
|
||||
Object sortValue = rawSortValues[i];
|
||||
if (sortValue instanceof BytesRef) {
|
||||
this.formattedSortValues[i] = sortValueFormats[i].format((BytesRef) sortValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public SearchSortValues(StreamInput in) throws IOException {
|
||||
int size = in.readVInt();
|
||||
if (size > 0) {
|
||||
sortValues = new Object[size];
|
||||
for (int i = 0; i < sortValues.length; i++) {
|
||||
byte type = in.readByte();
|
||||
if (type == 0) {
|
||||
sortValues[i] = null;
|
||||
} else if (type == 1) {
|
||||
sortValues[i] = in.readString();
|
||||
} else if (type == 2) {
|
||||
sortValues[i] = in.readInt();
|
||||
} else if (type == 3) {
|
||||
sortValues[i] = in.readLong();
|
||||
} else if (type == 4) {
|
||||
sortValues[i] = in.readFloat();
|
||||
} else if (type == 5) {
|
||||
sortValues[i] = in.readDouble();
|
||||
} else if (type == 6) {
|
||||
sortValues[i] = in.readByte();
|
||||
} else if (type == 7) {
|
||||
sortValues[i] = in.readShort();
|
||||
} else if (type == 8) {
|
||||
sortValues[i] = in.readBoolean();
|
||||
} else {
|
||||
throw new IOException("Can't match type [" + type + "]");
|
||||
}
|
||||
}
|
||||
SearchSortValues(StreamInput in) throws IOException {
|
||||
this.formattedSortValues = in.readArray(Lucene::readSortValue, Object[]::new);
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
this.rawSortValues = in.readArray(Lucene::readSortValue, Object[]::new);
|
||||
} else {
|
||||
sortValues = new Object[0];
|
||||
this.rawSortValues = EMPTY_ARRAY;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(sortValues.length);
|
||||
for (Object sortValue : sortValues) {
|
||||
if (sortValue == null) {
|
||||
out.writeByte((byte) 0);
|
||||
} else {
|
||||
Class type = sortValue.getClass();
|
||||
if (type == String.class) {
|
||||
out.writeByte((byte) 1);
|
||||
out.writeString((String) sortValue);
|
||||
} else if (type == Integer.class) {
|
||||
out.writeByte((byte) 2);
|
||||
out.writeInt((Integer) sortValue);
|
||||
} else if (type == Long.class) {
|
||||
out.writeByte((byte) 3);
|
||||
out.writeLong((Long) sortValue);
|
||||
} else if (type == Float.class) {
|
||||
out.writeByte((byte) 4);
|
||||
out.writeFloat((Float) sortValue);
|
||||
} else if (type == Double.class) {
|
||||
out.writeByte((byte) 5);
|
||||
out.writeDouble((Double) sortValue);
|
||||
} else if (type == Byte.class) {
|
||||
out.writeByte((byte) 6);
|
||||
out.writeByte((Byte) sortValue);
|
||||
} else if (type == Short.class) {
|
||||
out.writeByte((byte) 7);
|
||||
out.writeShort((Short) sortValue);
|
||||
} else if (type == Boolean.class) {
|
||||
out.writeByte((byte) 8);
|
||||
out.writeBoolean((Boolean) sortValue);
|
||||
} else {
|
||||
throw new IOException("Can't handle sort field value of type [" + type + "]");
|
||||
}
|
||||
}
|
||||
out.writeArray(Lucene::writeSortValue, this.formattedSortValues);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
out.writeArray(Lucene::writeSortValue, this.rawSortValues);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
if (sortValues.length > 0) {
|
||||
if (formattedSortValues.length > 0) {
|
||||
builder.startArray(Fields.SORT);
|
||||
for (Object sortValue : sortValues) {
|
||||
for (Object sortValue : formattedSortValues) {
|
||||
builder.value(sortValue);
|
||||
}
|
||||
builder.endArray();
|
||||
|
@ -142,24 +99,37 @@ public class SearchSortValues implements ToXContentFragment, Writeable {
|
|||
return new SearchSortValues(parser.list().toArray());
|
||||
}
|
||||
|
||||
public Object[] sortValues() {
|
||||
return sortValues;
|
||||
/**
|
||||
* Returns the formatted version of the values that sorting was performed against
|
||||
*/
|
||||
public Object[] getFormattedSortValues() {
|
||||
return formattedSortValues;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the raw version of the values that sorting was performed against
|
||||
*/
|
||||
public Object[] getRawSortValues() {
|
||||
return rawSortValues;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
SearchSortValues other = (SearchSortValues) obj;
|
||||
return Arrays.equals(sortValues, other.sortValues);
|
||||
SearchSortValues that = (SearchSortValues) o;
|
||||
return Arrays.equals(formattedSortValues, that.formattedSortValues) &&
|
||||
Arrays.equals(rawSortValues, that.rawSortValues);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Arrays.hashCode(sortValues);
|
||||
int result = Arrays.hashCode(formattedSortValues);
|
||||
result = 31 * result + Arrays.hashCode(rawSortValues);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -531,24 +531,26 @@ public class LuceneTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public static Object randomSortValue() {
|
||||
switch(randomIntBetween(0, 8)) {
|
||||
switch(randomIntBetween(0, 9)) {
|
||||
case 0:
|
||||
return randomAlphaOfLengthBetween(3, 10);
|
||||
return null;
|
||||
case 1:
|
||||
return randomInt();
|
||||
return randomAlphaOfLengthBetween(3, 10);
|
||||
case 2:
|
||||
return randomLong();
|
||||
return randomInt();
|
||||
case 3:
|
||||
return randomFloat();
|
||||
return randomLong();
|
||||
case 4:
|
||||
return randomDouble();
|
||||
return randomFloat();
|
||||
case 5:
|
||||
return randomByte();
|
||||
return randomDouble();
|
||||
case 6:
|
||||
return randomShort();
|
||||
return randomByte();
|
||||
case 7:
|
||||
return randomBoolean();
|
||||
return randomShort();
|
||||
case 8:
|
||||
return randomBoolean();
|
||||
case 9:
|
||||
return new BytesRef(randomAlphaOfLengthBetween(3, 10));
|
||||
default:
|
||||
throw new UnsupportedOperationException();
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.search;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.lucene.LuceneTests;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
|
@ -31,23 +32,36 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.test.RandomObjects;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
|
||||
public class SearchSortValuesTests extends AbstractSerializingTestCase<SearchSortValues> {
|
||||
|
||||
public static SearchSortValues createTestItem(XContentType xContentType, boolean transportSerialization) {
|
||||
int size = randomIntBetween(1, 20);
|
||||
Object[] values = new Object[size];
|
||||
DocValueFormat[] sortValueFormats = new DocValueFormat[size];
|
||||
for (int i = 0; i < size; i++) {
|
||||
Object sortValue = randomSortValue(xContentType, transportSerialization);
|
||||
values[i] = sortValue;
|
||||
//make sure that for BytesRef, we provide a specific doc value format that overrides format(BytesRef)
|
||||
sortValueFormats[i] = sortValue instanceof BytesRef ? DocValueFormat.RAW : randomDocValueFormat();
|
||||
if (transportSerialization) {
|
||||
DocValueFormat[] sortValueFormats = new DocValueFormat[size];
|
||||
for (int i = 0; i < size; i++) {
|
||||
Object sortValue = randomSortValue(xContentType, transportSerialization);
|
||||
values[i] = sortValue;
|
||||
//make sure that for BytesRef, we provide a specific doc value format that overrides format(BytesRef)
|
||||
sortValueFormats[i] = sortValue instanceof BytesRef ? DocValueFormat.RAW : randomDocValueFormat();
|
||||
}
|
||||
return new SearchSortValues(values, sortValueFormats);
|
||||
} else {
|
||||
//xcontent serialization doesn't write/parse the raw sort values, only the formatted ones
|
||||
for (int i = 0; i < size; i++) {
|
||||
Object sortValue = randomSortValue(xContentType, transportSerialization);
|
||||
//make sure that BytesRef are not provided as formatted values
|
||||
sortValue = sortValue instanceof BytesRef ? DocValueFormat.RAW.format((BytesRef)sortValue) : sortValue;
|
||||
values[i] = sortValue;
|
||||
}
|
||||
return new SearchSortValues(values);
|
||||
}
|
||||
return new SearchSortValues(values, sortValueFormats);
|
||||
}
|
||||
|
||||
private static Object randomSortValue(XContentType xContentType, boolean transportSerialization) {
|
||||
|
@ -79,7 +93,7 @@ public class SearchSortValuesTests extends AbstractSerializingTestCase<SearchSor
|
|||
|
||||
@Override
|
||||
protected SearchSortValues createTestInstance() {
|
||||
return createTestItem(randomFrom(XContentType.values()), true);
|
||||
return createTestItem(randomFrom(XContentType.values()), randomBoolean());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -113,20 +127,32 @@ public class SearchSortValuesTests extends AbstractSerializingTestCase<SearchSor
|
|||
|
||||
@Override
|
||||
protected SearchSortValues mutateInstance(SearchSortValues instance) {
|
||||
Object[] sortValues = instance.sortValues();
|
||||
if (sortValues.length == 0) {
|
||||
return createTestInstance();
|
||||
}
|
||||
Object[] sortValues = instance.getFormattedSortValues();
|
||||
if (randomBoolean()) {
|
||||
return new SearchSortValues(new Object[0]);
|
||||
}
|
||||
Object[] values = Arrays.copyOf(sortValues, sortValues.length + 1);
|
||||
values[sortValues.length] = randomSortValue(randomFrom(XContentType.values()), true);
|
||||
values[sortValues.length] = randomSortValue(randomFrom(XContentType.values()), randomBoolean());
|
||||
return new SearchSortValues(values);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SearchSortValues copyInstance(SearchSortValues instance, Version version) {
|
||||
return new SearchSortValues(Arrays.copyOf(instance.sortValues(), instance.sortValues().length));
|
||||
//TODO rename and update version after backport
|
||||
public void testSerializationPre70() throws IOException {
|
||||
Version version = VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_7_0_0));
|
||||
SearchSortValues original = createTestInstance();
|
||||
SearchSortValues deserialized = copyInstance(original, version);
|
||||
assertArrayEquals(original.getFormattedSortValues(), deserialized.getFormattedSortValues());
|
||||
assertEquals(0, deserialized.getRawSortValues().length);
|
||||
}
|
||||
|
||||
//TODO rename method and adapt versions after backport
|
||||
public void testReadFromPre70() throws IOException {
|
||||
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode("AwIAAAABAQEyBUAIAAAAAAAAAAAAAAAA"))) {
|
||||
in.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_7_0_0)));
|
||||
SearchSortValues deserialized = new SearchSortValues(in);
|
||||
SearchSortValues expected = new SearchSortValues(new Object[]{1, "2", 3d});
|
||||
assertEquals(expected, deserialized);
|
||||
assertEquals(0, deserialized.getRawSortValues().length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,6 @@ public abstract class AbstractWireSerializingTestCase<T extends Writeable> exten
|
|||
|
||||
@Override
|
||||
protected T copyInstance(T instance, Version version) throws IOException {
|
||||
return copyWriteable(instance, getNamedWriteableRegistry(), instanceReader());
|
||||
return copyWriteable(instance, getNamedWriteableRegistry(), instanceReader(), version);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue