diff --git a/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java b/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java index ed8c0358dbb..eac878569e1 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java @@ -21,14 +21,13 @@ package org.elasticsearch.search.fetch; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.transport.TransportResponse; import java.io.IOException; -import static org.elasticsearch.search.internal.InternalSearchHits.StreamContext; - /** * */ @@ -70,9 +69,17 @@ public class FetchSearchResult extends TransportResponse implements FetchSearchR } public void hits(InternalSearchHits hits) { + assert assertNoSearchTarget(hits); this.hits = hits; } + private boolean assertNoSearchTarget(InternalSearchHits hits) { + for (SearchHit hit : hits.hits()) { + assert hit.getShard() == null : "expected null but got: " + hit.getShard(); + } + return true; + } + public InternalSearchHits hits() { return hits; } @@ -96,13 +103,13 @@ public class FetchSearchResult extends TransportResponse implements FetchSearchR public void readFrom(StreamInput in) throws IOException { super.readFrom(in); id = in.readLong(); - hits = InternalSearchHits.readSearchHits(in, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + hits = InternalSearchHits.readSearchHits(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeLong(id); - hits.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM)); + hits.writeTo(out); } } diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java index e8ba4d88aa7..227fe90ee63 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java +++ b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java @@ -39,7 +39,6 @@ import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; -import org.elasticsearch.search.internal.InternalSearchHits.StreamContext.ShardTargetType; import org.elasticsearch.search.lookup.SourceLookup; import java.io.IOException; @@ -554,18 +553,14 @@ public class InternalSearchHit implements SearchHit { return builder; } - public static InternalSearchHit readSearchHit(StreamInput in, InternalSearchHits.StreamContext context) throws IOException { + public static InternalSearchHit readSearchHit(StreamInput in) throws IOException { InternalSearchHit hit = new InternalSearchHit(); - hit.readFrom(in, context); + hit.readFrom(in); return hit; } @Override public void readFrom(StreamInput in) throws IOException { - readFrom(in, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM)); - } - - public void readFrom(StreamInput in, InternalSearchHits.StreamContext context) throws IOException { score = in.readFloat(); id = in.readOptionalText(); type = in.readOptionalText(); @@ -644,26 +639,13 @@ public class InternalSearchHit implements SearchHit { matchedQueries[i] = in.readString(); } } - - if (context.streamShardTarget() == ShardTargetType.STREAM) { - if (in.readBoolean()) { - shard = new SearchShardTarget(in); - } - } else if (context.streamShardTarget() == ShardTargetType.LOOKUP) { - int lookupId = in.readVInt(); - if (lookupId > 0) { - shard = context.handleShardLookup().get(lookupId); - } - } - + shard = in.readOptionalWriteable(SearchShardTarget::new); size = in.readVInt(); if (size > 0) { innerHits = new HashMap<>(size); for (int i = 0; i < size; i++) { String key = in.readString(); - ShardTargetType shardTarget = context.streamShardTarget(); - InternalSearchHits value = InternalSearchHits.readSearchHits(in, context.streamShardTarget(ShardTargetType.NO_STREAM)); - context.streamShardTarget(shardTarget); + InternalSearchHits value = InternalSearchHits.readSearchHits(in); innerHits.put(key, value); } } @@ -671,10 +653,6 @@ public class InternalSearchHit implements SearchHit { @Override public void writeTo(StreamOutput out) throws IOException { - writeTo(out, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM)); - } - - public void writeTo(StreamOutput out, InternalSearchHits.StreamContext context) throws IOException { out.writeFloat(score); out.writeOptionalText(id); out.writeOptionalText(type); @@ -752,31 +730,14 @@ public class InternalSearchHit implements SearchHit { out.writeString(matchedFilter); } } - - if (context.streamShardTarget() == ShardTargetType.STREAM) { - if (shard == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - shard.writeTo(out); - } - } else if (context.streamShardTarget() == ShardTargetType.LOOKUP) { - if (shard == null) { - out.writeVInt(0); - } else { - out.writeVInt(context.shardHandleLookup().get(shard)); - } - } - + out.writeOptionalWriteable(shard); if (innerHits == null) { out.writeVInt(0); } else { out.writeVInt(innerHits.size()); for (Map.Entry entry : innerHits.entrySet()) { out.writeString(entry.getKey()); - ShardTargetType shardTarget = context.streamShardTarget(); - entry.getValue().writeTo(out, context.streamShardTarget(ShardTargetType.NO_STREAM)); - context.streamShardTarget(shardTarget); + entry.getValue().writeTo(out); } } } diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java index 592d4b0751e..ab4f84c2d17 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java +++ b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java @@ -40,54 +40,6 @@ import static org.elasticsearch.search.internal.InternalSearchHit.readSearchHit; */ public class InternalSearchHits implements SearchHits { - public static class StreamContext { - - public static enum ShardTargetType { - STREAM, - LOOKUP, - NO_STREAM - } - - private IdentityHashMap shardHandleLookup = new IdentityHashMap<>(); - private IntObjectHashMap handleShardLookup = new IntObjectHashMap<>(); - private ShardTargetType streamShardTarget = ShardTargetType.STREAM; - - public StreamContext reset() { - shardHandleLookup.clear(); - handleShardLookup.clear(); - streamShardTarget = ShardTargetType.STREAM; - return this; - } - - public IdentityHashMap shardHandleLookup() { - return shardHandleLookup; - } - - public IntObjectHashMap handleShardLookup() { - return handleShardLookup; - } - - public ShardTargetType streamShardTarget() { - return streamShardTarget; - } - - public StreamContext streamShardTarget(ShardTargetType streamShardTarget) { - this.streamShardTarget = streamShardTarget; - return this; - } - } - - private static final ThreadLocal cache = new ThreadLocal() { - @Override - protected StreamContext initialValue() { - return new StreamContext(); - } - }; - - public static StreamContext streamContext() { - return cache.get().reset(); - } - public static InternalSearchHits empty() { // We shouldn't use static final instance, since that could directly be returned by native transport clients return new InternalSearchHits(EMPTY, 0, 0); @@ -186,11 +138,6 @@ public class InternalSearchHits implements SearchHits { return builder; } - public static InternalSearchHits readSearchHits(StreamInput in, StreamContext context) throws IOException { - InternalSearchHits hits = new InternalSearchHits(); - hits.readFrom(in, context); - return hits; - } public static InternalSearchHits readSearchHits(StreamInput in) throws IOException { InternalSearchHits hits = new InternalSearchHits(); @@ -200,63 +147,27 @@ public class InternalSearchHits implements SearchHits { @Override public void readFrom(StreamInput in) throws IOException { - readFrom(in, streamContext().streamShardTarget(StreamContext.ShardTargetType.LOOKUP)); - } - - public void readFrom(StreamInput in, StreamContext context) throws IOException { totalHits = in.readVLong(); maxScore = in.readFloat(); int size = in.readVInt(); if (size == 0) { hits = EMPTY; } else { - if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) { - // read the lookup table first - int lookupSize = in.readVInt(); - for (int i = 0; i < lookupSize; i++) { - context.handleShardLookup().put(in.readVInt(), new SearchShardTarget(in)); - } - } - hits = new InternalSearchHit[size]; for (int i = 0; i < hits.length; i++) { - hits[i] = readSearchHit(in, context); + hits[i] = readSearchHit(in); } } } @Override public void writeTo(StreamOutput out) throws IOException { - writeTo(out, streamContext().streamShardTarget(StreamContext.ShardTargetType.LOOKUP)); - } - - public void writeTo(StreamOutput out, StreamContext context) throws IOException { out.writeVLong(totalHits); out.writeFloat(maxScore); out.writeVInt(hits.length); if (hits.length > 0) { - if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) { - // start from 1, 0 is for null! - int counter = 1; - for (InternalSearchHit hit : hits) { - if (hit.shard() != null) { - Integer handle = context.shardHandleLookup().get(hit.shard()); - if (handle == null) { - context.shardHandleLookup().put(hit.shard(), counter++); - } - } - } - out.writeVInt(context.shardHandleLookup().size()); - if (!context.shardHandleLookup().isEmpty()) { - for (Map.Entry entry : context.shardHandleLookup().entrySet()) { - out.writeVInt(entry.getValue()); - entry.getKey().writeTo(out); - } - } - } - for (InternalSearchHit hit : hits) { - hit.writeTo(out, context); + hit.writeTo(out); } } } diff --git a/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java b/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java index 84533710781..c86c0565225 100644 --- a/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java +++ b/core/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; -import org.elasticsearch.search.internal.InternalSearchHits.StreamContext.ShardTargetType; import org.elasticsearch.search.suggest.Suggest; import java.io.IOException; @@ -261,8 +260,7 @@ public final class CompletionSuggestion extends Suggest.Suggestion(contextSize); @@ -283,7 +281,7 @@ public final class CompletionSuggestion extends Suggest.Suggestion builders = new ArrayList<>(); boolean strictTimeBasedIndices = randomBoolean();