Cut over SearchPhaseResult to Writeable (#41853)

Relates to #34389
This commit is contained in:
Luca Cavanna 2019-05-08 11:00:59 +02:00
parent c85f285298
commit 1ded45b0a2
10 changed files with 61 additions and 137 deletions

View File

@ -19,11 +19,13 @@
package org.elasticsearch.search;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
/**
* This class is a base class for all search related results. It contains the shard target it
* was executed against, a shard index used to reference the result on the coordinating node
@ -32,15 +34,22 @@ import org.elasticsearch.transport.TransportResponse;
* across search phases to ensure the same point in time snapshot is used for querying and
* fetching etc.
*/
public abstract class SearchPhaseResult extends TransportResponse implements Streamable {
public abstract class SearchPhaseResult extends TransportResponse {
private SearchShardTarget searchShardTarget;
private int shardIndex = -1;
protected long requestId;
protected SearchPhaseResult() {
}
protected SearchPhaseResult(StreamInput in) throws IOException {
super(in);
}
/**
* Returns the results request ID that is used to reference the search context on the executing
* node
* Returns the results request ID that is used to reference the search context on the executing node
*/
public long getRequestId() {
return requestId;
@ -79,4 +88,9 @@ public abstract class SearchPhaseResult extends TransportResponse implements Str
* Returns the fetch result iff it's included in this response otherwise <code>null</code>
*/
public FetchSearchResult fetchResult() { return null; }
@Override
public final void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}

View File

@ -1055,9 +1055,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
public static final class CanMatchResponse extends SearchPhaseResult {
private boolean canMatch;
private final boolean canMatch;
public CanMatchResponse(StreamInput in) throws IOException {
super(in);
this.canMatch = in.readBoolean();
}
@ -1065,12 +1066,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
this.canMatch = canMatch;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
canMatch = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -43,11 +43,22 @@ public class DfsSearchResult extends SearchPhaseResult {
private ObjectObjectHashMap<String, CollectionStatistics> fieldStatistics = HppcMaps.newNoNullKeysMap();
private int maxDoc;
public DfsSearchResult() {
}
public DfsSearchResult(StreamInput in) throws IOException {
readFrom(in);
super(in);
requestId = in.readLong();
int termsSize = in.readVInt();
if (termsSize == 0) {
terms = EMPTY_TERMS;
} else {
terms = new Term[termsSize];
for (int i = 0; i < terms.length; i++) {
terms[i] = new Term(in.readString(), in.readBytesRef());
}
}
this.termStatistics = readTermStats(in, terms);
fieldStatistics = readFieldStats(in);
maxDoc = in.readVInt();
}
public DfsSearchResult(long id, SearchShardTarget shardTarget) {
@ -87,26 +98,6 @@ public class DfsSearchResult extends SearchPhaseResult {
return fieldStatistics;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestId = in.readLong();
int termsSize = in.readVInt();
if (termsSize == 0) {
terms = EMPTY_TERMS;
} else {
terms = new Term[termsSize];
for (int i = 0; i < terms.length; i++) {
terms[i] = new Term(in.readString(), in.readBytesRef());
}
}
this.termStatistics = readTermStats(in, terms);
readFieldStats(in, fieldStatistics);
maxDoc = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -161,16 +152,9 @@ public class DfsSearchResult extends SearchPhaseResult {
}
}
public static ObjectObjectHashMap<String, CollectionStatistics> readFieldStats(StreamInput in) throws IOException {
return readFieldStats(in, null);
}
public static ObjectObjectHashMap<String, CollectionStatistics> readFieldStats(StreamInput in,
ObjectObjectHashMap<String, CollectionStatistics> fieldStatistics) throws IOException {
static ObjectObjectHashMap<String, CollectionStatistics> readFieldStats(StreamInput in) throws IOException {
final int numFieldStatistics = in.readVInt();
if (fieldStatistics == null) {
fieldStatistics = HppcMaps.newNoNullKeysMap(numFieldStatistics);
}
ObjectObjectHashMap<String, CollectionStatistics> fieldStatistics = HppcMaps.newNoNullKeysMap(numFieldStatistics);
for (int i = 0; i < numFieldStatistics; i++) {
final String field = in.readString();
assert field != null;
@ -194,7 +178,7 @@ public class DfsSearchResult extends SearchPhaseResult {
return fieldStatistics;
}
public static TermStatistics[] readTermStats(StreamInput in, Term[] terms) throws IOException {
static TermStatistics[] readTermStats(StreamInput in, Term[] terms) throws IOException {
int termsStatsSize = in.readVInt();
final TermStatistics[] termStatistics;
if (termsStatsSize == 0) {
@ -216,7 +200,6 @@ public class DfsSearchResult extends SearchPhaseResult {
return termStatistics;
}
/*
* optional statistics are set to -1 in lucene by default.
* Since we are using var longs to encode values we add one to each value
@ -227,7 +210,6 @@ public class DfsSearchResult extends SearchPhaseResult {
return value + 1;
}
/*
* See #addOne this just subtracting one and asserts that the actual value
* is positive.
@ -236,5 +218,4 @@ public class DfsSearchResult extends SearchPhaseResult {
assert value >= 0;
return value - 1;
}
}

View File

@ -39,7 +39,9 @@ public final class FetchSearchResult extends SearchPhaseResult {
}
public FetchSearchResult(StreamInput in) throws IOException {
readFrom(in);
super(in);
requestId = in.readLong();
hits = new SearchHits(in);
}
public FetchSearchResult(long id, SearchShardTarget shardTarget) {
@ -82,19 +84,6 @@ public final class FetchSearchResult extends SearchPhaseResult {
return counter++;
}
public static FetchSearchResult readFetchSearchResult(StreamInput in) throws IOException {
FetchSearchResult result = new FetchSearchResult();
result.readFrom(in);
return result;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestId = in.readLong();
hits = new SearchHits(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -27,19 +27,15 @@ import org.elasticsearch.search.query.QuerySearchResult;
import java.io.IOException;
import static org.elasticsearch.search.fetch.FetchSearchResult.readFetchSearchResult;
import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchResult;
public final class QueryFetchSearchResult extends SearchPhaseResult {
private QuerySearchResult queryResult;
private FetchSearchResult fetchResult;
public QueryFetchSearchResult() {
}
private final QuerySearchResult queryResult;
private final FetchSearchResult fetchResult;
public QueryFetchSearchResult(StreamInput in) throws IOException {
readFrom(in);
super(in);
queryResult = new QuerySearchResult(in);
fetchResult = new FetchSearchResult(in);
}
public QueryFetchSearchResult(QuerySearchResult queryResult, FetchSearchResult fetchResult) {
@ -81,19 +77,6 @@ public final class QueryFetchSearchResult extends SearchPhaseResult {
return fetchResult;
}
public static QueryFetchSearchResult readQueryFetchSearchResult(StreamInput in) throws IOException {
QueryFetchSearchResult result = new QueryFetchSearchResult();
result.readFrom(in);
return result;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
queryResult = readQuerySearchResult(in);
fetchResult = readFetchSearchResult(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -27,17 +27,15 @@ import org.elasticsearch.search.query.QuerySearchResult;
import java.io.IOException;
import static org.elasticsearch.search.fetch.QueryFetchSearchResult.readQueryFetchSearchResult;
public final class ScrollQueryFetchSearchResult extends SearchPhaseResult {
private QueryFetchSearchResult result;
public ScrollQueryFetchSearchResult() {
}
private final QueryFetchSearchResult result;
public ScrollQueryFetchSearchResult(StreamInput in) throws IOException {
readFrom(in);
super(in);
SearchShardTarget searchShardTarget = new SearchShardTarget(in);
result = new QueryFetchSearchResult(in);
setSearchShardTarget(searchShardTarget);
}
public ScrollQueryFetchSearchResult(QueryFetchSearchResult result, SearchShardTarget shardTarget) {
@ -71,14 +69,6 @@ public final class ScrollQueryFetchSearchResult extends SearchPhaseResult {
return result.fetchResult();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
SearchShardTarget searchShardTarget = new SearchShardTarget(in);
result = readQueryFetchSearchResult(in);
setSearchShardTarget(searchShardTarget);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -67,7 +67,9 @@ public final class QuerySearchResult extends SearchPhaseResult {
}
public QuerySearchResult(StreamInput in) throws IOException {
readFrom(in);
super(in);
long id = in.readLong();
readFromWithId(id, in);
}
public QuerySearchResult(long id, SearchShardTarget shardTarget) {
@ -256,19 +258,6 @@ public final class QuerySearchResult extends SearchPhaseResult {
return hasScoreDocs || hasSuggestHits();
}
public static QuerySearchResult readQuerySearchResult(StreamInput in) throws IOException {
QuerySearchResult result = new QuerySearchResult();
result.readFrom(in);
return result;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
long id = in.readLong();
readFromWithId(id, in);
}
public void readFromWithId(long id, StreamInput in) throws IOException {
this.requestId = id;
from = in.readVInt();

View File

@ -26,17 +26,15 @@ import org.elasticsearch.search.SearchShardTarget;
import java.io.IOException;
import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchResult;
public final class ScrollQuerySearchResult extends SearchPhaseResult {
private QuerySearchResult result;
public ScrollQuerySearchResult() {
}
private final QuerySearchResult result;
public ScrollQuerySearchResult(StreamInput in) throws IOException {
readFrom(in);
super(in);
SearchShardTarget shardTarget = new SearchShardTarget(in);
result = new QuerySearchResult(in);
setSearchShardTarget(shardTarget);
}
public ScrollQuerySearchResult(QuerySearchResult result, SearchShardTarget shardTarget) {
@ -61,14 +59,6 @@ public final class ScrollQuerySearchResult extends SearchPhaseResult {
return result;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
SearchShardTarget shardTarget = new SearchShardTarget(in);
result = readQuerySearchResult(in);
setSearchShardTarget(shardTarget);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -27,7 +27,6 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
@ -418,11 +417,6 @@ public class SearchAsyncActionTests extends ESTestCase {
this.node = node;
}
@Override
public void readFrom(StreamInput in) throws IOException {
}
@Override
public void writeTo(StreamOutput out) throws IOException {

View File

@ -78,7 +78,7 @@ public class QuerySearchResultTests extends ESTestCase {
public void testSerialization() throws Exception {
QuerySearchResult querySearchResult = createTestInstance();
Version version = VersionUtils.randomVersion(random());
QuerySearchResult deserialized = copyStreamable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version);
QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version);
assertEquals(querySearchResult.getRequestId(), deserialized.getRequestId());
assertNull(deserialized.getSearchShardTarget());
assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f);
@ -121,8 +121,7 @@ public class QuerySearchResultTests extends ESTestCase {
byte[] bytes = Base64.getDecoder().decode(message);
try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(bytes), namedWriteableRegistry)) {
in.setVersion(Version.V_7_0_0);
QuerySearchResult querySearchResult = new QuerySearchResult();
querySearchResult.readFrom(in);
QuerySearchResult querySearchResult = new QuerySearchResult(in);
assertEquals(100, querySearchResult.getRequestId());
assertTrue(querySearchResult.hasAggs());
InternalAggregations aggs = (InternalAggregations)querySearchResult.consumeAggs();