Added percolator improvements:
* The _percolator type now has always to _id field enabled (index=not_analyzed, store=no) * During loading shard initialization the query ids are fetched from field data, before ids were fetched from stored values. * Moved internal percolator query map storage from Text to HashedBytesRef based keys.
This commit is contained in:
parent
0472bac2ef
commit
cbdaf4950b
|
@ -19,11 +19,10 @@
|
|||
|
||||
package org.elasticsearch.action.percolate;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.text.StringText;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.percolator.PercolatorService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -32,9 +31,11 @@ import java.io.IOException;
|
|||
*/
|
||||
public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
||||
|
||||
private static final BytesRef[] EMPTY = new BytesRef[0];
|
||||
|
||||
private long count;
|
||||
private float[] scores;
|
||||
private Text[] matches;
|
||||
private BytesRef[] matches;
|
||||
|
||||
// Request fields:
|
||||
private boolean limit;
|
||||
|
@ -45,7 +46,7 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
|||
public PercolateShardResponse() {
|
||||
}
|
||||
|
||||
public PercolateShardResponse(Text[] matches, long count, float[] scores, PercolatorService.PercolateContext context, String index, int shardId) {
|
||||
public PercolateShardResponse(BytesRef[] matches, long count, float[] scores, PercolatorService.PercolateContext context, String index, int shardId) {
|
||||
super(index, shardId);
|
||||
this.matches = matches;
|
||||
this.count = count;
|
||||
|
@ -56,7 +57,7 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
|||
this.score = context.score;
|
||||
}
|
||||
|
||||
public PercolateShardResponse(Text[] matches, long count, PercolatorService.PercolateContext context, String index, int shardId) {
|
||||
public PercolateShardResponse(BytesRef[] matches, long count, PercolatorService.PercolateContext context, String index, int shardId) {
|
||||
super(index, shardId);
|
||||
this.matches = matches;
|
||||
this.scores = new float[0];
|
||||
|
@ -70,7 +71,7 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
|||
public PercolateShardResponse(long count, PercolatorService.PercolateContext context, String index, int shardId) {
|
||||
super(index, shardId);
|
||||
this.count = count;
|
||||
this.matches = StringText.EMPTY_ARRAY;
|
||||
this.matches = EMPTY;
|
||||
this.scores = new float[0];
|
||||
this.limit = context.limit;
|
||||
this.requestedSize = context.size;
|
||||
|
@ -80,7 +81,7 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
|||
|
||||
public PercolateShardResponse(PercolatorService.PercolateContext context, String index, int shardId) {
|
||||
super(index, shardId);
|
||||
this.matches = StringText.EMPTY_ARRAY;
|
||||
this.matches = EMPTY;
|
||||
this.scores = new float[0];
|
||||
this.limit = context.limit;
|
||||
this.requestedSize = context.size;
|
||||
|
@ -88,7 +89,7 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
|||
this.score = context.score;
|
||||
}
|
||||
|
||||
public Text[] matches() {
|
||||
public BytesRef[] matches() {
|
||||
return matches;
|
||||
}
|
||||
|
||||
|
@ -120,7 +121,10 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
|||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
count = in.readVLong();
|
||||
matches = in.readTextArray();
|
||||
matches = new BytesRef[in.readVInt()];
|
||||
for (int i = 0; i < matches.length; i++) {
|
||||
matches[i] = in.readBytesRef();
|
||||
}
|
||||
scores = new float[in.readVInt()];
|
||||
for (int i = 0; i < scores.length; i++) {
|
||||
scores[i] = in.readFloat();
|
||||
|
@ -135,7 +139,10 @@ public class PercolateShardResponse extends BroadcastShardOperationResponse {
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeVLong(count);
|
||||
out.writeTextArray(matches);
|
||||
out.writeVInt(matches.length);
|
||||
for (BytesRef match : matches) {
|
||||
out.writeBytesRef(match);
|
||||
}
|
||||
out.writeVLong(scores.length);
|
||||
for (float score : scores) {
|
||||
out.writeFloat(score);
|
||||
|
|
|
@ -33,9 +33,11 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.BytesText;
|
||||
import org.elasticsearch.common.text.StringText;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.index.engine.DocumentMissingException;
|
||||
|
@ -189,7 +191,8 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
|
|||
Text index = new StringText(response.getIndex());
|
||||
for (int i = 0; i < response.matches().length; i++) {
|
||||
float score = response.scores().length == 0 ? Float.NaN : response.scores()[i];
|
||||
finalMatches.add(new PercolateResponse.Match(index, response.matches()[i], score));
|
||||
Text match = new BytesText(new BytesArray(response.matches()[i]));
|
||||
finalMatches.add(new PercolateResponse.Match(index, match, score));
|
||||
}
|
||||
} else {
|
||||
int[] slots = new int[shardResults.size()];
|
||||
|
@ -216,7 +219,7 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
|
|||
|
||||
PercolateShardResponse shardResponse = shardResults.get(requestIndex);
|
||||
Text index = new StringText(shardResponse.getIndex());
|
||||
Text match = shardResponse.matches()[itemIndex];
|
||||
Text match = new BytesText(new BytesArray(shardResponse.matches()[itemIndex]));
|
||||
float score = shardResponse.scores()[itemIndex];
|
||||
finalMatches.add(new PercolateResponse.Match(index, match, score));
|
||||
if (finalMatches.size() == requestedSize) {
|
||||
|
@ -230,7 +233,8 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
|
|||
Text index = new StringText(response.getIndex());
|
||||
for (int i = 0; i < response.matches().length; i++) {
|
||||
float score = response.scores().length == 0 ? 0f : response.scores()[i];
|
||||
finalMatches.add(new PercolateResponse.Match(index, response.matches()[i], score));
|
||||
Text match = new BytesText(new BytesArray(response.matches()[i]));
|
||||
finalMatches.add(new PercolateResponse.Match(index, match, score));
|
||||
if (requestedSize != 0 && finalMatches.size() == requestedSize) {
|
||||
break outer;
|
||||
}
|
||||
|
|
|
@ -174,6 +174,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
|
|||
} else {
|
||||
percolatorMappingSource = "{\n" +
|
||||
" \"_percolator\":{\n" +
|
||||
" \"_id\" : {\"index\": \"not_analyzed\"}," +
|
||||
" \"properties\" : {\n" +
|
||||
" \"query\" : {\n" +
|
||||
" \"type\" : \"object\",\n" +
|
||||
|
|
|
@ -2,16 +2,14 @@ package org.elasticsearch.index.percolator;
|
|||
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.HashedBytesArray;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lucene.HashedBytesRef;
|
||||
import org.elasticsearch.common.lucene.search.TermFilter;
|
||||
import org.elasticsearch.common.lucene.search.XConstantScoreQuery;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.BytesText;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
|
@ -19,6 +17,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
import org.elasticsearch.index.indexing.IndexingOperationListener;
|
||||
import org.elasticsearch.index.indexing.ShardIndexingService;
|
||||
import org.elasticsearch.index.mapper.DocumentTypeListener;
|
||||
|
@ -49,10 +48,11 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
|
|||
private final MapperService mapperService;
|
||||
private final IndicesLifecycle indicesLifecycle;
|
||||
private final IndexCache indexCache;
|
||||
private final IndexFieldDataService indexFieldDataService;
|
||||
|
||||
private final ShardIndexingService indexingService;
|
||||
|
||||
private final ConcurrentMap<Text, Query> percolateQueries = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
|
||||
private final ConcurrentMap<HashedBytesRef, Query> percolateQueries = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
|
||||
private final ShardLifecycleListener shardLifecycleListener = new ShardLifecycleListener();
|
||||
private final RealTimePercolatorOperationListener realTimePercolatorOperationListener = new RealTimePercolatorOperationListener();
|
||||
private final PercolateTypeListener percolateTypeListener = new PercolateTypeListener();
|
||||
|
@ -61,19 +61,21 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
|
|||
|
||||
@Inject
|
||||
public PercolatorQueriesRegistry(ShardId shardId, @IndexSettings Settings indexSettings, IndexQueryParserService queryParserService,
|
||||
ShardIndexingService indexingService, IndicesLifecycle indicesLifecycle, MapperService mapperService, IndexCache indexCache) {
|
||||
ShardIndexingService indexingService, IndicesLifecycle indicesLifecycle, MapperService mapperService,
|
||||
IndexCache indexCache, IndexFieldDataService indexFieldDataService) {
|
||||
super(shardId, indexSettings);
|
||||
this.queryParserService = queryParserService;
|
||||
this.mapperService = mapperService;
|
||||
this.indicesLifecycle = indicesLifecycle;
|
||||
this.indexingService = indexingService;
|
||||
this.indexCache = indexCache;
|
||||
this.indexFieldDataService = indexFieldDataService;
|
||||
|
||||
indicesLifecycle.addListener(shardLifecycleListener);
|
||||
mapperService.addTypeListener(percolateTypeListener);
|
||||
}
|
||||
|
||||
public ConcurrentMap<Text, Query> percolateQueries() {
|
||||
public ConcurrentMap<HashedBytesRef, Query> percolateQueries() {
|
||||
return percolateQueries;
|
||||
}
|
||||
|
||||
|
@ -102,18 +104,16 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
|
|||
}
|
||||
}
|
||||
|
||||
public void addPercolateQuery(String uidAsString, BytesReference source) {
|
||||
Query query = parsePercolatorDocument(uidAsString, source);
|
||||
BytesText uid = new BytesText(new HashedBytesArray(Strings.toUTF8Bytes(uidAsString)));
|
||||
percolateQueries.put(uid, query);
|
||||
public void addPercolateQuery(String idAsString, BytesReference source) {
|
||||
Query query = parsePercolatorDocument(idAsString, source);
|
||||
percolateQueries.put(new HashedBytesRef(new BytesRef(idAsString)), query);
|
||||
}
|
||||
|
||||
public void removePercolateQuery(String uidAsString) {
|
||||
BytesText uid = new BytesText(new HashedBytesArray(Strings.toUTF8Bytes(uidAsString)));
|
||||
percolateQueries.remove(uid);
|
||||
public void removePercolateQuery(String idAsString) {
|
||||
percolateQueries.remove(new HashedBytesRef(idAsString));
|
||||
}
|
||||
|
||||
Query parsePercolatorDocument(String uid, BytesReference source) {
|
||||
Query parsePercolatorDocument(String id, BytesReference source) {
|
||||
String type = null;
|
||||
BytesReference querySource = null;
|
||||
|
||||
|
@ -123,7 +123,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
|
|||
String currentFieldName = null;
|
||||
XContentParser.Token token = parser.nextToken(); // move the START_OBJECT
|
||||
if (token != XContentParser.Token.START_OBJECT) {
|
||||
throw new ElasticSearchException("failed to parse query [" + uid + "], not starting with OBJECT");
|
||||
throw new ElasticSearchException("failed to parse query [" + id + "], not starting with OBJECT");
|
||||
}
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
|
@ -151,7 +151,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
|
|||
}
|
||||
return parseQuery(type, querySource, null);
|
||||
} catch (Exception e) {
|
||||
throw new PercolatorException(shardId().index(), "failed to parse query [" + uid + "]", e);
|
||||
throw new PercolatorException(shardId().index(), "failed to parse query [" + id + "]", e);
|
||||
} finally {
|
||||
if (parser != null) {
|
||||
parser.close();
|
||||
|
@ -234,7 +234,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
|
|||
new TermFilter(new Term(TypeFieldMapper.NAME, PercolatorService.Constants.TYPE_NAME))
|
||||
)
|
||||
);
|
||||
QueriesLoaderCollector queries = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger);
|
||||
QueriesLoaderCollector queries = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger, indexFieldDataService);
|
||||
searcher.searcher().search(query, queries);
|
||||
percolateQueries.putAll(queries.queries());
|
||||
} finally {
|
||||
|
|
|
@ -6,12 +6,17 @@ import org.apache.lucene.index.AtomicReaderContext;
|
|||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.HashedBytesArray;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.text.BytesText;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.index.fieldvisitor.UidAndSourceFieldsVisitor;
|
||||
import org.elasticsearch.common.lucene.HashedBytesRef;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.index.fielddata.BytesValues;
|
||||
import org.elasticsearch.index.fielddata.FieldDataType;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldData;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
import org.elasticsearch.index.fieldvisitor.JustSourceFieldsVisitor;
|
||||
import org.elasticsearch.index.mapper.FieldMapper;
|
||||
import org.elasticsearch.index.mapper.internal.IdFieldMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
@ -20,43 +25,56 @@ import java.util.Map;
|
|||
*/
|
||||
final class QueriesLoaderCollector extends Collector {
|
||||
|
||||
private final Map<Text, Query> queries = Maps.newHashMap();
|
||||
private final Map<HashedBytesRef, Query> queries = Maps.newHashMap();
|
||||
private final JustSourceFieldsVisitor fieldsVisitor = new JustSourceFieldsVisitor();
|
||||
private final PercolatorQueriesRegistry percolator;
|
||||
private final IndexFieldData idFieldData;
|
||||
private final ESLogger logger;
|
||||
|
||||
private BytesValues idValues;
|
||||
private AtomicReader reader;
|
||||
|
||||
QueriesLoaderCollector(PercolatorQueriesRegistry percolator, ESLogger logger) {
|
||||
QueriesLoaderCollector(PercolatorQueriesRegistry percolator, ESLogger logger, IndexFieldDataService indexFieldDataService) {
|
||||
this.percolator = percolator;
|
||||
this.logger = logger;
|
||||
this.idFieldData = indexFieldDataService.getForField(
|
||||
new FieldMapper.Names(IdFieldMapper.NAME),
|
||||
new FieldDataType("string", ImmutableSettings.builder().put("format", "paged_bytes"))
|
||||
);
|
||||
}
|
||||
|
||||
public Map<Text, Query> queries() {
|
||||
public Map<HashedBytesRef, Query> queries() {
|
||||
return this.queries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
// the _source is the query
|
||||
UidAndSourceFieldsVisitor fieldsVisitor = new UidAndSourceFieldsVisitor();
|
||||
BytesRef id = idValues.getValue(doc);
|
||||
if (id == null) {
|
||||
return;
|
||||
}
|
||||
fieldsVisitor.reset();
|
||||
reader.document(doc, fieldsVisitor);
|
||||
String id = fieldsVisitor.uid().id();
|
||||
|
||||
try {
|
||||
final Query parseQuery = percolator.parsePercolatorDocument(id, fieldsVisitor.source());
|
||||
// id is only used for logging, if we fail we log the id in the catch statement
|
||||
final Query parseQuery = percolator.parsePercolatorDocument(null, fieldsVisitor.source());
|
||||
if (parseQuery != null) {
|
||||
queries.put(new BytesText(new HashedBytesArray(Strings.toUTF8Bytes(id))), parseQuery);
|
||||
queries.put(new HashedBytesRef(idValues.makeSafe(id)), parseQuery);
|
||||
} else {
|
||||
logger.warn("failed to add query [{}] - parser returned null", id);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to add query [{}]", e, id);
|
||||
logger.warn("failed to add query [{}]", e, id.utf8ToString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNextReader(AtomicReaderContext context) throws IOException {
|
||||
this.reader = context.reader();
|
||||
reader = context.reader();
|
||||
idValues = idFieldData.load(context).getBytesValues();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -35,12 +35,11 @@ import org.elasticsearch.action.percolate.PercolateShardResponse;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lucene.HashedBytesRef;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.search.XConstantScoreQuery;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.BytesText;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
|
@ -51,7 +50,11 @@ import org.elasticsearch.index.fielddata.BytesValues;
|
|||
import org.elasticsearch.index.fielddata.FieldDataType;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldData;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
import org.elasticsearch.index.mapper.*;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.FieldMapper;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.internal.IdFieldMapper;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
|
||||
import org.elasticsearch.index.service.IndexService;
|
||||
|
@ -289,7 +292,7 @@ public class PercolatorService extends AbstractComponent {
|
|||
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
|
||||
long count = 0;
|
||||
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
|
||||
for (Map.Entry<Text, Query> entry : context.percolateQueries.entrySet()) {
|
||||
for (Map.Entry<HashedBytesRef, Query> entry : context.percolateQueries.entrySet()) {
|
||||
collector.reset();
|
||||
try {
|
||||
context.docSearcher.search(entry.getValue(), collector);
|
||||
|
@ -330,10 +333,10 @@ public class PercolatorService extends AbstractComponent {
|
|||
@Override
|
||||
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context) {
|
||||
long count = 0;
|
||||
List<Text> matches = new ArrayList<Text>();
|
||||
List<BytesRef> matches = new ArrayList<BytesRef>();
|
||||
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
|
||||
|
||||
for (Map.Entry<Text, Query> entry : context.percolateQueries.entrySet()) {
|
||||
for (Map.Entry<HashedBytesRef, Query> entry : context.percolateQueries.entrySet()) {
|
||||
collector.reset();
|
||||
try {
|
||||
context.docSearcher.search(entry.getValue(), collector);
|
||||
|
@ -343,12 +346,12 @@ public class PercolatorService extends AbstractComponent {
|
|||
|
||||
if (collector.exists()) {
|
||||
if (!context.limit || count < context.size) {
|
||||
matches.add(entry.getKey());
|
||||
matches.add(entry.getKey().bytes);
|
||||
}
|
||||
count++;
|
||||
}
|
||||
}
|
||||
return new PercolateShardResponse(matches.toArray(new Text[0]), count, context, request.index(), request.shardId());
|
||||
return new PercolateShardResponse(matches.toArray(new BytesRef[0]), count, context, request.index(), request.shardId());
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -359,9 +362,9 @@ public class PercolatorService extends AbstractComponent {
|
|||
try {
|
||||
Match match = match(logger, context);
|
||||
queryBasedPercolating(percolatorSearcher, context, match);
|
||||
List<Text> matches = match.matches();
|
||||
List<BytesRef> matches = match.matches();
|
||||
long count = match.counter();
|
||||
return new PercolateShardResponse(matches.toArray(new Text[0]), count, context, request.index(), request.shardId());
|
||||
return new PercolateShardResponse(matches.toArray(new BytesRef[0]), count, context, request.index(), request.shardId());
|
||||
} catch (IOException e) {
|
||||
logger.debug("failed to execute", e);
|
||||
throw new PercolateException(context.indexShard.shardId(), "failed to execute", e);
|
||||
|
@ -378,7 +381,7 @@ public class PercolatorService extends AbstractComponent {
|
|||
try {
|
||||
MatchAndScore matchAndScore = matchAndScore(logger, context);
|
||||
queryBasedPercolating(percolatorSearcher, context, matchAndScore);
|
||||
Text[] matches = matchAndScore.matches().toArray(new Text[0]);
|
||||
BytesRef[] matches = matchAndScore.matches().toArray(new BytesRef[0]);
|
||||
float[] scores = matchAndScore.scores().toArray();
|
||||
long count = matchAndScore.counter();
|
||||
return new PercolateShardResponse(matches, count, scores, context, request.index(), request.shardId());
|
||||
|
@ -401,21 +404,23 @@ public class PercolatorService extends AbstractComponent {
|
|||
queryBasedPercolating(percolatorSearcher, context, matchAndSort);
|
||||
TopDocs topDocs = matchAndSort.topDocs();
|
||||
long count = topDocs.totalHits;
|
||||
List<Text> matches = new ArrayList<Text>(topDocs.scoreDocs.length);
|
||||
List<BytesRef> matches = new ArrayList<BytesRef>(topDocs.scoreDocs.length);
|
||||
float[] scores = new float[topDocs.scoreDocs.length];
|
||||
|
||||
IndexFieldData uidFieldData = context.fieldData.getForField(new FieldMapper.Names(UidFieldMapper.NAME), new FieldDataType("string", ImmutableSettings.builder().put("format", "paged_bytes")));
|
||||
IndexFieldData idFieldData = context.fieldData.getForField(
|
||||
new FieldMapper.Names(IdFieldMapper.NAME),
|
||||
new FieldDataType("string", ImmutableSettings.builder().put("format", "paged_bytes"))
|
||||
);
|
||||
int i = 0;
|
||||
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
|
||||
int segmentIdx = ReaderUtil.subIndex(scoreDoc.doc, percolatorSearcher.reader().leaves());
|
||||
AtomicReaderContext atomicReaderContext = percolatorSearcher.reader().leaves().get(segmentIdx);
|
||||
BytesValues values = uidFieldData.load(atomicReaderContext).getBytesValues();
|
||||
BytesRef uid = values.getValue(scoreDoc.doc - atomicReaderContext.docBase);
|
||||
Text id = new BytesText(Uid.idFromUid(uid));
|
||||
matches.add(id);
|
||||
BytesValues values = idFieldData.load(atomicReaderContext).getBytesValues();
|
||||
BytesRef id = values.getValue(scoreDoc.doc - atomicReaderContext.docBase);
|
||||
matches.add(values.makeSafe(id));
|
||||
scores[i++] = scoreDoc.score;
|
||||
}
|
||||
return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), count, scores, context, request.index(), request.shardId());
|
||||
return new PercolateShardResponse(matches.toArray(new BytesRef[matches.size()]), count, scores, context, request.index(), request.shardId());
|
||||
} catch (Exception e) {
|
||||
logger.debug("failed to execute", e);
|
||||
throw new PercolateException(context.indexShard.shardId(), "failed to execute", e);
|
||||
|
@ -441,7 +446,7 @@ public class PercolatorService extends AbstractComponent {
|
|||
public boolean sort;
|
||||
|
||||
Query query;
|
||||
ConcurrentMap<Text, Query> percolateQueries;
|
||||
ConcurrentMap<HashedBytesRef, Query> percolateQueries;
|
||||
IndexSearcher docSearcher;
|
||||
IndexShard indexShard;
|
||||
IndexFieldDataService fieldData;
|
||||
|
|
|
@ -5,16 +5,14 @@ import org.apache.lucene.index.AtomicReaderContext;
|
|||
import org.apache.lucene.search.*;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.lucene.HashedBytesRef;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.text.BytesText;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.index.fielddata.BytesValues;
|
||||
import org.elasticsearch.index.fielddata.FieldDataType;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldData;
|
||||
import org.elasticsearch.index.mapper.FieldMapper;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
import org.elasticsearch.index.mapper.internal.IdFieldMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -25,12 +23,13 @@ import java.util.concurrent.ConcurrentMap;
|
|||
*/
|
||||
abstract class QueryCollector extends Collector {
|
||||
|
||||
final IndexFieldData uidFieldData;
|
||||
final IndexFieldData idFieldData;
|
||||
final IndexSearcher searcher;
|
||||
final ConcurrentMap<Text, Query> queries;
|
||||
final ConcurrentMap<HashedBytesRef, Query> queries;
|
||||
final ESLogger logger;
|
||||
|
||||
final Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
|
||||
final HashedBytesRef spare = new HashedBytesRef(new BytesRef());
|
||||
|
||||
BytesValues values;
|
||||
|
||||
|
@ -38,8 +37,10 @@ abstract class QueryCollector extends Collector {
|
|||
this.logger = logger;
|
||||
this.queries = context.percolateQueries;
|
||||
this.searcher = context.docSearcher;
|
||||
// TODO: when we move to a UID level mapping def on the index level, we can use that one, now, its per type, and we can't easily choose one
|
||||
this.uidFieldData = context.fieldData.getForField(new FieldMapper.Names(UidFieldMapper.NAME), new FieldDataType("string", ImmutableSettings.builder().put("format", "paged_bytes")));
|
||||
this.idFieldData = context.fieldData.getForField(
|
||||
new FieldMapper.Names(IdFieldMapper.NAME),
|
||||
new FieldDataType("string", ImmutableSettings.builder().put("format", "paged_bytes"))
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -49,7 +50,7 @@ abstract class QueryCollector extends Collector {
|
|||
@Override
|
||||
public void setNextReader(AtomicReaderContext context) throws IOException {
|
||||
// we use the UID because id might not be indexed
|
||||
values = uidFieldData.load(context).getBytesValues();
|
||||
values = idFieldData.load(context).getBytesValues();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -76,7 +77,7 @@ abstract class QueryCollector extends Collector {
|
|||
|
||||
final static class Match extends QueryCollector {
|
||||
|
||||
private final List<Text> matches = new ArrayList<Text>();
|
||||
private final List<BytesRef> matches = new ArrayList<BytesRef>();
|
||||
private final boolean limit;
|
||||
private final int size;
|
||||
private long counter = 0;
|
||||
|
@ -89,12 +90,8 @@ abstract class QueryCollector extends Collector {
|
|||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
BytesRef uid = values.getValue(doc);
|
||||
if (uid == null) {
|
||||
return;
|
||||
}
|
||||
Text id = new BytesText(Uid.idFromUid(uid));
|
||||
Query query = queries.get(id);
|
||||
spare.hash = values.getValueHashed(doc, spare.bytes);
|
||||
Query query = queries.get(spare);
|
||||
if (query == null) {
|
||||
// log???
|
||||
return;
|
||||
|
@ -105,12 +102,12 @@ abstract class QueryCollector extends Collector {
|
|||
searcher.search(query, collector);
|
||||
if (collector.exists()) {
|
||||
if (!limit || counter < size) {
|
||||
matches.add(id);
|
||||
matches.add(values.makeSafe(spare.bytes));
|
||||
}
|
||||
counter++;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("[" + id + "] failed to execute query", e);
|
||||
logger.warn("[" + spare.bytes.utf8ToString() + "] failed to execute query", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -118,7 +115,7 @@ abstract class QueryCollector extends Collector {
|
|||
return counter;
|
||||
}
|
||||
|
||||
List<Text> matches() {
|
||||
List<BytesRef> matches() {
|
||||
return matches;
|
||||
}
|
||||
|
||||
|
@ -136,12 +133,8 @@ abstract class QueryCollector extends Collector {
|
|||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
BytesRef uid = values.getValue(doc);
|
||||
if (uid == null) {
|
||||
return;
|
||||
}
|
||||
Text id = new BytesText(Uid.idFromUid(uid));
|
||||
Query query = queries.get(id);
|
||||
spare.hash = values.getValueHashed(doc, spare.bytes);
|
||||
Query query = queries.get(spare);
|
||||
if (query == null) {
|
||||
// log???
|
||||
return;
|
||||
|
@ -154,7 +147,7 @@ abstract class QueryCollector extends Collector {
|
|||
topDocsCollector.collect(doc);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("[" + id + "] failed to execute query", e);
|
||||
logger.warn("[" + spare.bytes.utf8ToString() + "] failed to execute query", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -177,7 +170,7 @@ abstract class QueryCollector extends Collector {
|
|||
|
||||
final static class MatchAndScore extends QueryCollector {
|
||||
|
||||
private final List<Text> matches = new ArrayList<Text>();
|
||||
private final List<BytesRef> matches = new ArrayList<BytesRef>();
|
||||
// TODO: Use thread local in order to cache the scores lists?
|
||||
private final TFloatArrayList scores = new TFloatArrayList();
|
||||
private final boolean limit;
|
||||
|
@ -194,12 +187,8 @@ abstract class QueryCollector extends Collector {
|
|||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
BytesRef uid = values.getValue(doc);
|
||||
if (uid == null) {
|
||||
return;
|
||||
}
|
||||
Text id = new BytesText(Uid.idFromUid(uid));
|
||||
Query query = queries.get(id);
|
||||
spare.hash = values.getValueHashed(doc, spare.bytes);
|
||||
Query query = queries.get(spare);
|
||||
if (query == null) {
|
||||
// log???
|
||||
return;
|
||||
|
@ -210,13 +199,13 @@ abstract class QueryCollector extends Collector {
|
|||
searcher.search(query, collector);
|
||||
if (collector.exists()) {
|
||||
if (!limit || counter < size) {
|
||||
matches.add(id);
|
||||
matches.add(values.makeSafe(spare.bytes));
|
||||
scores.add(scorer.score());
|
||||
}
|
||||
counter++;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("[" + id + "] failed to execute query", e);
|
||||
logger.warn("[" + spare.bytes.utf8ToString() + "] failed to execute query", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -229,7 +218,7 @@ abstract class QueryCollector extends Collector {
|
|||
return counter;
|
||||
}
|
||||
|
||||
List<Text> matches() {
|
||||
List<BytesRef> matches() {
|
||||
return matches;
|
||||
}
|
||||
|
||||
|
@ -248,12 +237,8 @@ abstract class QueryCollector extends Collector {
|
|||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
BytesRef uid = values.getValue(doc);
|
||||
if (uid == null) {
|
||||
return;
|
||||
}
|
||||
Text id = new BytesText(Uid.idFromUid(uid));
|
||||
Query query = queries.get(id);
|
||||
spare.hash = values.getValueHashed(doc, spare.bytes);
|
||||
Query query = queries.get(spare);
|
||||
if (query == null) {
|
||||
// log???
|
||||
return;
|
||||
|
@ -266,7 +251,7 @@ abstract class QueryCollector extends Collector {
|
|||
counter++;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("[" + id + "] failed to execute query", e);
|
||||
logger.warn("[" + spare.bytes.utf8ToString() + "] failed to execute query", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue