Remove poor-mans compression in InternalSearchHit and friends (#20472)

We still use some crazy poor mans compression in InternalSearchHit that
uses a thread local and an unordered map as a lookup table if requested.
Stuff like this should be handled by compression on the transport layer
rather than in-line in the serialization code. This code is complex enough.
This commit is contained in:
Simon Willnauer 2016-09-14 15:25:25 +02:00 committed by GitHub
parent c1e84618a6
commit d402ca0dd7
6 changed files with 28 additions and 155 deletions

View File

@ -21,14 +21,13 @@ package org.elasticsearch.search.fetch;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import static org.elasticsearch.search.internal.InternalSearchHits.StreamContext;
/**
*
*/
@ -70,9 +69,17 @@ public class FetchSearchResult extends TransportResponse implements FetchSearchR
}
public void hits(InternalSearchHits hits) {
assert assertNoSearchTarget(hits);
this.hits = hits;
}
private boolean assertNoSearchTarget(InternalSearchHits hits) {
for (SearchHit hit : hits.hits()) {
assert hit.getShard() == null : "expected null but got: " + hit.getShard();
}
return true;
}
public InternalSearchHits hits() {
return hits;
}
@ -96,13 +103,13 @@ public class FetchSearchResult extends TransportResponse implements FetchSearchR
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
hits = InternalSearchHits.readSearchHits(in, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM));
hits = InternalSearchHits.readSearchHits(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
hits.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM));
hits.writeTo(out);
}
}

View File

@ -39,7 +39,6 @@ import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.internal.InternalSearchHits.StreamContext.ShardTargetType;
import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException;
@ -554,18 +553,14 @@ public class InternalSearchHit implements SearchHit {
return builder;
}
public static InternalSearchHit readSearchHit(StreamInput in, InternalSearchHits.StreamContext context) throws IOException {
public static InternalSearchHit readSearchHit(StreamInput in) throws IOException {
InternalSearchHit hit = new InternalSearchHit();
hit.readFrom(in, context);
hit.readFrom(in);
return hit;
}
@Override
public void readFrom(StreamInput in) throws IOException {
readFrom(in, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
}
public void readFrom(StreamInput in, InternalSearchHits.StreamContext context) throws IOException {
score = in.readFloat();
id = in.readOptionalText();
type = in.readOptionalText();
@ -644,26 +639,13 @@ public class InternalSearchHit implements SearchHit {
matchedQueries[i] = in.readString();
}
}
if (context.streamShardTarget() == ShardTargetType.STREAM) {
if (in.readBoolean()) {
shard = new SearchShardTarget(in);
}
} else if (context.streamShardTarget() == ShardTargetType.LOOKUP) {
int lookupId = in.readVInt();
if (lookupId > 0) {
shard = context.handleShardLookup().get(lookupId);
}
}
shard = in.readOptionalWriteable(SearchShardTarget::new);
size = in.readVInt();
if (size > 0) {
innerHits = new HashMap<>(size);
for (int i = 0; i < size; i++) {
String key = in.readString();
ShardTargetType shardTarget = context.streamShardTarget();
InternalSearchHits value = InternalSearchHits.readSearchHits(in, context.streamShardTarget(ShardTargetType.NO_STREAM));
context.streamShardTarget(shardTarget);
InternalSearchHits value = InternalSearchHits.readSearchHits(in);
innerHits.put(key, value);
}
}
@ -671,10 +653,6 @@ public class InternalSearchHit implements SearchHit {
@Override
public void writeTo(StreamOutput out) throws IOException {
writeTo(out, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
}
public void writeTo(StreamOutput out, InternalSearchHits.StreamContext context) throws IOException {
out.writeFloat(score);
out.writeOptionalText(id);
out.writeOptionalText(type);
@ -752,31 +730,14 @@ public class InternalSearchHit implements SearchHit {
out.writeString(matchedFilter);
}
}
if (context.streamShardTarget() == ShardTargetType.STREAM) {
if (shard == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
shard.writeTo(out);
}
} else if (context.streamShardTarget() == ShardTargetType.LOOKUP) {
if (shard == null) {
out.writeVInt(0);
} else {
out.writeVInt(context.shardHandleLookup().get(shard));
}
}
out.writeOptionalWriteable(shard);
if (innerHits == null) {
out.writeVInt(0);
} else {
out.writeVInt(innerHits.size());
for (Map.Entry<String, InternalSearchHits> entry : innerHits.entrySet()) {
out.writeString(entry.getKey());
ShardTargetType shardTarget = context.streamShardTarget();
entry.getValue().writeTo(out, context.streamShardTarget(ShardTargetType.NO_STREAM));
context.streamShardTarget(shardTarget);
entry.getValue().writeTo(out);
}
}
}

View File

@ -40,54 +40,6 @@ import static org.elasticsearch.search.internal.InternalSearchHit.readSearchHit;
*/
public class InternalSearchHits implements SearchHits {
public static class StreamContext {
public static enum ShardTargetType {
STREAM,
LOOKUP,
NO_STREAM
}
private IdentityHashMap<SearchShardTarget, Integer> shardHandleLookup = new IdentityHashMap<>();
private IntObjectHashMap<SearchShardTarget> handleShardLookup = new IntObjectHashMap<>();
private ShardTargetType streamShardTarget = ShardTargetType.STREAM;
public StreamContext reset() {
shardHandleLookup.clear();
handleShardLookup.clear();
streamShardTarget = ShardTargetType.STREAM;
return this;
}
public IdentityHashMap<SearchShardTarget, Integer> shardHandleLookup() {
return shardHandleLookup;
}
public IntObjectHashMap<SearchShardTarget> handleShardLookup() {
return handleShardLookup;
}
public ShardTargetType streamShardTarget() {
return streamShardTarget;
}
public StreamContext streamShardTarget(ShardTargetType streamShardTarget) {
this.streamShardTarget = streamShardTarget;
return this;
}
}
private static final ThreadLocal<StreamContext> cache = new ThreadLocal<StreamContext>() {
@Override
protected StreamContext initialValue() {
return new StreamContext();
}
};
public static StreamContext streamContext() {
return cache.get().reset();
}
public static InternalSearchHits empty() {
// We shouldn't use static final instance, since that could directly be returned by native transport clients
return new InternalSearchHits(EMPTY, 0, 0);
@ -186,11 +138,6 @@ public class InternalSearchHits implements SearchHits {
return builder;
}
public static InternalSearchHits readSearchHits(StreamInput in, StreamContext context) throws IOException {
InternalSearchHits hits = new InternalSearchHits();
hits.readFrom(in, context);
return hits;
}
public static InternalSearchHits readSearchHits(StreamInput in) throws IOException {
InternalSearchHits hits = new InternalSearchHits();
@ -200,63 +147,27 @@ public class InternalSearchHits implements SearchHits {
@Override
public void readFrom(StreamInput in) throws IOException {
readFrom(in, streamContext().streamShardTarget(StreamContext.ShardTargetType.LOOKUP));
}
public void readFrom(StreamInput in, StreamContext context) throws IOException {
totalHits = in.readVLong();
maxScore = in.readFloat();
int size = in.readVInt();
if (size == 0) {
hits = EMPTY;
} else {
if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) {
// read the lookup table first
int lookupSize = in.readVInt();
for (int i = 0; i < lookupSize; i++) {
context.handleShardLookup().put(in.readVInt(), new SearchShardTarget(in));
}
}
hits = new InternalSearchHit[size];
for (int i = 0; i < hits.length; i++) {
hits[i] = readSearchHit(in, context);
hits[i] = readSearchHit(in);
}
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
writeTo(out, streamContext().streamShardTarget(StreamContext.ShardTargetType.LOOKUP));
}
public void writeTo(StreamOutput out, StreamContext context) throws IOException {
out.writeVLong(totalHits);
out.writeFloat(maxScore);
out.writeVInt(hits.length);
if (hits.length > 0) {
if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) {
// start from 1, 0 is for null!
int counter = 1;
for (InternalSearchHit hit : hits) {
if (hit.shard() != null) {
Integer handle = context.shardHandleLookup().get(hit.shard());
if (handle == null) {
context.shardHandleLookup().put(hit.shard(), counter++);
}
}
}
out.writeVInt(context.shardHandleLookup().size());
if (!context.shardHandleLookup().isEmpty()) {
for (Map.Entry<SearchShardTarget, Integer> entry : context.shardHandleLookup().entrySet()) {
out.writeVInt(entry.getValue());
entry.getKey().writeTo(out);
}
}
}
for (InternalSearchHit hit : hits) {
hit.writeTo(out, context);
hit.writeTo(out);
}
}
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchHits.StreamContext.ShardTargetType;
import org.elasticsearch.search.suggest.Suggest;
import java.io.IOException;
@ -261,8 +260,7 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
super.readFrom(in);
this.doc = Lucene.readScoreDoc(in);
if (in.readBoolean()) {
this.hit = InternalSearchHit.readSearchHit(in,
InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
this.hit = InternalSearchHit.readSearchHit(in);
}
int contextSize = in.readInt();
this.contexts = new LinkedHashMap<>(contextSize);
@ -283,7 +281,7 @@ public final class CompletionSuggestion extends Suggest.Suggestion<CompletionSug
Lucene.writeScoreDoc(out, doc);
if (hit != null) {
out.writeBoolean(true);
hit.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
hit.writeTo(out);
} else {
out.writeBoolean(false);
}

View File

@ -32,6 +32,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class InternalSearchHitTests extends ESTestCase {
@ -63,19 +64,15 @@ public class InternalSearchHitTests extends ESTestCase {
InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[]{hit1, hit2}, 2, 1f);
InternalSearchHits.StreamContext context = new InternalSearchHits.StreamContext();
context.streamShardTarget(InternalSearchHits.StreamContext.ShardTargetType.STREAM);
BytesStreamOutput output = new BytesStreamOutput();
hits.writeTo(output, context);
hits.writeTo(output);
InputStream input = output.bytes().streamInput();
context = new InternalSearchHits.StreamContext();
context.streamShardTarget(InternalSearchHits.StreamContext.ShardTargetType.STREAM);
InternalSearchHits results = InternalSearchHits.readSearchHits(new InputStreamStreamInput(input), context);
InternalSearchHits results = InternalSearchHits.readSearchHits(new InputStreamStreamInput(input));
assertThat(results.getAt(0).shard(), equalTo(target));
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).shard(), nullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getInnerHits().get("1").getAt(0).shard(), nullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(1).shard(), nullValue());
assertThat(results.getAt(0).getInnerHits().get("2").getAt(0).shard(), nullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).shard(), notNullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getInnerHits().get("1").getAt(0).shard(), notNullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(1).shard(), notNullValue());
assertThat(results.getAt(0).getInnerHits().get("2").getAt(0).shard(), notNullValue());
assertThat(results.getAt(1).shard(), equalTo(target));
}

View File

@ -122,7 +122,6 @@ public class FieldSortIT extends ESIntegTestCase {
}
}
@LuceneTestCase.BadApple(bugUrl = "simon is working on this")
public void testIssue6614() throws ExecutionException, InterruptedException {
List<IndexRequestBuilder> builders = new ArrayList<>();
boolean strictTimeBasedIndices = randomBoolean();