more improved search result streaming, write a header with shard targets, so each hit just write an id of the targe

This commit is contained in:
kimchy 2010-03-21 15:09:02 +02:00
parent 77564cb14f
commit 297e2091e2
4 changed files with 92 additions and 22 deletions

View File

@ -29,7 +29,6 @@ import org.apache.lucene.search.ShardFieldDocSortedHitQueue;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
@ -190,7 +189,7 @@ public class SearchPhaseController {
}
// merge hits
List<SearchHit> hits = new ArrayList<SearchHit>();
List<InternalSearchHit> hits = new ArrayList<InternalSearchHit>();
if (!fetchResults.isEmpty()) {
for (ShardDoc shardDoc : sortedDocs) {
FetchSearchResultProvider fetchResultProvider = fetchResults.get(shardDoc.shardTarget());
@ -199,14 +198,14 @@ public class SearchPhaseController {
}
FetchSearchResult fetchResult = fetchResultProvider.fetchResult();
int index = fetchResult.counterGetAndIncrement();
if (index < fetchResult.hits().hits().length) {
SearchHit searchHit = fetchResult.hits().hits()[index];
((InternalSearchHit) searchHit).shard(fetchResult.shardTarget());
if (index < fetchResult.hits().internalHits().length) {
InternalSearchHit searchHit = fetchResult.hits().internalHits()[index];
searchHit.shard(fetchResult.shardTarget());
hits.add(searchHit);
}
}
}
InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new SearchHit[hits.size()]), totalHits);
InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits);
return new InternalSearchResponse(searchHits, facets);
}
}

View File

@ -25,7 +25,6 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.document.Fieldable;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchPhase;
@ -66,7 +65,7 @@ public class FetchPhase implements SearchPhase {
public void execute(SearchContext context) {
FieldSelector fieldSelector = buildFieldSelectors(context);
SearchHit[] hits = new SearchHit[context.docIdsToLoad().length];
InternalSearchHit[] hits = new InternalSearchHit[context.docIdsToLoad().length];
int index = 0;
for (int docId : context.docIdsToLoad()) {
Document doc = loadDocument(context, fieldSelector, docId);

View File

@ -28,6 +28,7 @@ import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.highlight.HighlightField;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.Unicode;
import org.elasticsearch.util.gnu.trove.TIntObjectHashMap;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.json.JsonBuilder;
@ -223,7 +224,17 @@ public class InternalSearchHit implements SearchHit {
return hit;
}
public static InternalSearchHit readSearchHit(StreamInput in, @Nullable TIntObjectHashMap<SearchShardTarget> shardLookupMap) throws IOException {
InternalSearchHit hit = new InternalSearchHit();
hit.readFrom(in, shardLookupMap);
return hit;
}
@Override public void readFrom(StreamInput in) throws IOException {
readFrom(in, null);
}
public void readFrom(StreamInput in, @Nullable TIntObjectHashMap<SearchShardTarget> shardLookupMap) throws IOException {
id = in.readUTF();
type = in.readUTF();
int size = in.readVInt();
@ -301,12 +312,23 @@ public class InternalSearchHit implements SearchHit {
highlightFields = builder.build();
}
if (in.readBoolean()) {
shard = readSearchShardTarget(in);
if (shardLookupMap != null) {
int lookupId = in.readVInt();
if (lookupId > 0) {
shard = shardLookupMap.get(lookupId);
}
} else {
if (in.readBoolean()) {
shard = readSearchShardTarget(in);
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
writeTo(out, null);
}
public void writeTo(StreamOutput out, @Nullable Map<SearchShardTarget, Integer> shardLookupMap) throws IOException {
out.writeUTF(id);
out.writeUTF(type);
if (source == null) {
@ -337,11 +359,19 @@ public class InternalSearchHit implements SearchHit {
highlightField.writeTo(out);
}
}
if (shard == null) {
out.writeBoolean(false);
if (shardLookupMap == null) {
if (shard == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
shard.writeTo(out);
}
} else {
out.writeBoolean(true);
shard.writeTo(out);
if (shard == null) {
out.writeVInt(0);
} else {
out.writeVInt(shardLookupMap.get(shard));
}
}
}
}

View File

@ -21,12 +21,17 @@ package org.elasticsearch.search.internal;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.util.gnu.trove.TIntObjectHashMap;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.json.JsonBuilder;
import java.io.IOException;
import java.util.IdentityHashMap;
import java.util.Map;
import static org.elasticsearch.search.SearchShardTarget.*;
import static org.elasticsearch.search.internal.InternalSearchHit.*;
/**
@ -34,17 +39,17 @@ import static org.elasticsearch.search.internal.InternalSearchHit.*;
*/
public class InternalSearchHits implements SearchHits {
private static final SearchHit[] EMPTY = new SearchHit[0];
private static final InternalSearchHit[] EMPTY = new InternalSearchHit[0];
private SearchHit[] hits;
private InternalSearchHit[] hits;
private long totalHits;
private InternalSearchHits() {
InternalSearchHits() {
}
public InternalSearchHits(SearchHit[] hits, long totalHits) {
public InternalSearchHits(InternalSearchHit[] hits, long totalHits) {
this.hits = hits;
this.totalHits = totalHits;
}
@ -57,6 +62,10 @@ public class InternalSearchHits implements SearchHits {
return this.hits;
}
public InternalSearchHit[] internalHits() {
return this.hits;
}
@Override public void toJson(JsonBuilder builder, Params params) throws IOException {
builder.startObject("hits");
builder.field("total", totalHits);
@ -81,9 +90,19 @@ public class InternalSearchHits implements SearchHits {
if (size == 0) {
hits = EMPTY;
} else {
hits = new SearchHit[size];
// read the lookup table first
int lookupSize = in.readVInt();
TIntObjectHashMap<SearchShardTarget> shardLookupMap = null;
if (lookupSize > 0) {
shardLookupMap = new TIntObjectHashMap<SearchShardTarget>(lookupSize);
for (int i = 0; i < lookupSize; i++) {
shardLookupMap.put(in.readVInt(), readSearchShardTarget(in));
}
}
hits = new InternalSearchHit[size];
for (int i = 0; i < hits.length; i++) {
hits[i] = readSearchHit(in);
hits[i] = readSearchHit(in, shardLookupMap);
}
}
}
@ -91,8 +110,31 @@ public class InternalSearchHits implements SearchHits {
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(totalHits);
out.writeVInt(hits.length);
for (SearchHit hit : hits) {
hit.writeTo(out);
if (hits.length > 0) {
// write the header search shard targets (we assume identity equality)
IdentityHashMap<SearchShardTarget, Integer> shardLookupMap = new IdentityHashMap<SearchShardTarget, Integer>();
// start from 1, 0 is for null!
int counter = 1;
// put an entry for null
for (InternalSearchHit hit : hits) {
if (hit.shard() != null) {
Integer handle = shardLookupMap.get(hit.shard());
if (handle == null) {
shardLookupMap.put(hit.shard(), counter++);
}
}
}
out.writeVInt(shardLookupMap.size());
if (!shardLookupMap.isEmpty()) {
for (Map.Entry<SearchShardTarget, Integer> entry : shardLookupMap.entrySet()) {
out.writeVInt(entry.getValue());
entry.getKey().writeTo(out);
}
}
for (InternalSearchHit hit : hits) {
hit.writeTo(out, shardLookupMap.isEmpty() ? null : shardLookupMap);
}
}
}
}