when a shard is recovered (from gateway or another shard, sniff mappings from it). Also, improved search load doc perforamnce when no explicit fields are provided
This commit is contained in:
parent
2e81730272
commit
82e5b7d1c9
|
@ -21,6 +21,7 @@ package org.elasticsearch.index.mapper;
|
|||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.FieldSelector;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.elasticsearch.util.concurrent.ThreadSafe;
|
||||
|
||||
/**
|
||||
|
@ -31,8 +32,10 @@ import org.elasticsearch.util.concurrent.ThreadSafe;
|
|||
@ThreadSafe
|
||||
public interface SourceFieldMapper extends FieldMapper<String> {
|
||||
|
||||
public final String NAME = StringHelper.intern("_source");
|
||||
|
||||
/**
|
||||
* Returns <tt>true</tt> if the source field mapper is enalbed or not.
|
||||
* Returns <tt>true</tt> if the source field mapper is enabled or not.
|
||||
*/
|
||||
boolean enabled();
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.index.mapper;
|
|||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.elasticsearch.util.concurrent.ThreadSafe;
|
||||
|
||||
/**
|
||||
|
@ -31,6 +32,8 @@ import org.elasticsearch.util.concurrent.ThreadSafe;
|
|||
@ThreadSafe
|
||||
public interface TypeFieldMapper extends FieldMapper<String> {
|
||||
|
||||
public static final String NAME = StringHelper.intern("_type");
|
||||
|
||||
String value(Document document);
|
||||
|
||||
Term term(String value);
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.mapper;
|
||||
|
||||
import org.apache.lucene.document.FieldSelector;
|
||||
import org.apache.lucene.document.FieldSelectorResult;
|
||||
|
||||
/**
|
||||
* An optimized field selector that loads just the uid and the source.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class UidAndSourceFieldSelector implements FieldSelector {
|
||||
|
||||
private int match = 0;
|
||||
|
||||
@Override public FieldSelectorResult accept(String fieldName) {
|
||||
if (UidFieldMapper.NAME.equals(fieldName)) {
|
||||
if (++match == 2) {
|
||||
match = 0;
|
||||
return FieldSelectorResult.LOAD_AND_BREAK;
|
||||
}
|
||||
return FieldSelectorResult.LOAD;
|
||||
}
|
||||
if (SourceFieldMapper.NAME.equals(fieldName)) {
|
||||
if (++match == 2) {
|
||||
match = 0;
|
||||
return FieldSelectorResult.LOAD_AND_BREAK;
|
||||
}
|
||||
return FieldSelectorResult.LOAD;
|
||||
}
|
||||
return FieldSelectorResult.NO_LOAD;
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.index.mapper;
|
||||
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.elasticsearch.util.concurrent.ThreadSafe;
|
||||
|
||||
/**
|
||||
|
@ -28,6 +29,8 @@ import org.elasticsearch.util.concurrent.ThreadSafe;
|
|||
@ThreadSafe
|
||||
public interface UidFieldMapper extends FieldMapper<Uid> {
|
||||
|
||||
public static final String NAME = StringHelper.intern("_uid");
|
||||
|
||||
String name();
|
||||
|
||||
Term term(String type, String id);
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.lucene.document.Field;
|
|||
import org.apache.lucene.document.Fieldable;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.*;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.elasticsearch.index.mapper.FieldMapper;
|
||||
import org.elasticsearch.index.mapper.FieldMapperListener;
|
||||
import org.elasticsearch.util.lucene.search.TermFilter;
|
||||
|
@ -194,9 +195,9 @@ public abstract class JsonFieldMapper<T> implements FieldMapper<T>, JsonMapper {
|
|||
|
||||
protected JsonFieldMapper(String name, String indexName, String fullName, Field.Index index, Field.Store store, Field.TermVector termVector,
|
||||
float boost, boolean omitNorms, boolean omitTermFreqAndPositions, Analyzer indexAnalyzer, Analyzer searchAnalyzer) {
|
||||
this.name = name;
|
||||
this.indexName = indexName;
|
||||
this.fullName = fullName;
|
||||
this.name = StringHelper.intern(name);
|
||||
this.indexName = StringHelper.intern(indexName);
|
||||
this.fullName = StringHelper.intern(fullName);
|
||||
this.index = index;
|
||||
this.store = store;
|
||||
this.termVector = termVector;
|
||||
|
|
|
@ -34,7 +34,7 @@ import java.io.IOException;
|
|||
public class JsonSourceFieldMapper extends JsonFieldMapper<String> implements SourceFieldMapper {
|
||||
|
||||
public static class Defaults extends JsonFieldMapper.Defaults {
|
||||
public static final String NAME = "_source";
|
||||
public static final String NAME = SourceFieldMapper.NAME;
|
||||
public static final boolean ENABLED = true;
|
||||
public static final Field.Index INDEX = Field.Index.NO;
|
||||
public static final Field.Store STORE = Field.Store.YES;
|
||||
|
|
|
@ -34,8 +34,8 @@ import java.io.IOException;
|
|||
public class JsonTypeFieldMapper extends JsonFieldMapper<String> implements TypeFieldMapper {
|
||||
|
||||
public static class Defaults extends JsonFieldMapper.Defaults {
|
||||
public static final String NAME = "_type";
|
||||
public static final String INDEX_NAME = "_type";
|
||||
public static final String NAME = TypeFieldMapper.NAME;
|
||||
public static final String INDEX_NAME = TypeFieldMapper.NAME;
|
||||
public static final Field.Index INDEX = Field.Index.NOT_ANALYZED;
|
||||
public static final Field.Store STORE = Field.Store.NO;
|
||||
public static final boolean OMIT_NORMS = true;
|
||||
|
|
|
@ -35,7 +35,7 @@ import java.io.IOException;
|
|||
public class JsonUidFieldMapper extends JsonFieldMapper<Uid> implements UidFieldMapper {
|
||||
|
||||
public static class Defaults extends JsonFieldMapper.Defaults {
|
||||
public static final String NAME = "_uid";
|
||||
public static final String NAME = UidFieldMapper.NAME;
|
||||
public static final Field.Index INDEX = Field.Index.NOT_ANALYZED;
|
||||
public static final boolean OMIT_NORMS = true;
|
||||
public static final boolean OMIT_TERM_FREQ_AND_POSITIONS = true;
|
||||
|
|
|
@ -21,7 +21,10 @@ package org.elasticsearch.index.shard;
|
|||
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.FieldSelector;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.index.TermEnum;
|
||||
import org.apache.lucene.search.*;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
|
@ -31,10 +34,7 @@ import org.elasticsearch.index.cache.filter.FilterCache;
|
|||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.engine.ScheduledRefreshableEngine;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.DocumentMapperNotFoundException;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.*;
|
||||
import org.elasticsearch.index.query.IndexQueryParser;
|
||||
import org.elasticsearch.index.query.IndexQueryParserMissingException;
|
||||
import org.elasticsearch.index.query.IndexQueryParserService;
|
||||
|
@ -53,8 +53,11 @@ import org.elasticsearch.util.lucene.search.TermFilter;
|
|||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
import static com.google.common.collect.Lists.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
|
@ -76,6 +79,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
|
||||
private final Translog translog;
|
||||
|
||||
|
||||
// the number of docs to sniff for mapping information in each type
|
||||
private final int mappingSnifferDocs;
|
||||
|
||||
|
||||
private final Object mutex = new Object();
|
||||
|
||||
private volatile IndexShardState state;
|
||||
|
@ -95,6 +103,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
this.queryParserService = queryParserService;
|
||||
this.filterCache = filterCache;
|
||||
state = IndexShardState.CREATED;
|
||||
|
||||
this.mappingSnifferDocs = componentSettings.getAsInt("mappingSnifferDocs", 100);
|
||||
}
|
||||
|
||||
public Store store() {
|
||||
|
@ -390,6 +400,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
synchronized (mutex) {
|
||||
state = IndexShardState.STARTED;
|
||||
}
|
||||
threadPool.execute(new ShardMappingSniffer());
|
||||
scheduleRefresherIfNeeded();
|
||||
}
|
||||
|
||||
|
@ -406,6 +417,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
synchronized (mutex) {
|
||||
state = IndexShardState.STARTED;
|
||||
}
|
||||
threadPool.execute(new ShardMappingSniffer());
|
||||
scheduleRefresherIfNeeded();
|
||||
}
|
||||
}
|
||||
|
@ -505,4 +517,110 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The mapping sniffer reads docs from the index and introduces them into the mapping service. This is
|
||||
* because of dynamic fields and we want to reintroduce them.
|
||||
*
|
||||
* <p>Note, this is done on the shard level, we might have other dynamic fields in other shards, but
|
||||
* this will be taken care off in another component.
|
||||
*/
|
||||
private class ShardMappingSniffer implements Runnable {
|
||||
@Override public void run() {
|
||||
engine.refresh(new Engine.Refresh(true));
|
||||
|
||||
TermEnum termEnum = null;
|
||||
Engine.Searcher searcher = searcher();
|
||||
try {
|
||||
List<String> typeNames = newArrayList();
|
||||
termEnum = searcher.reader().terms(new Term(TypeFieldMapper.NAME, ""));
|
||||
while (true) {
|
||||
Term term = termEnum.term();
|
||||
if (term == null) {
|
||||
break;
|
||||
}
|
||||
if (!term.field().equals(TypeFieldMapper.NAME)) {
|
||||
break;
|
||||
}
|
||||
typeNames.add(term.text());
|
||||
termEnum.next();
|
||||
}
|
||||
|
||||
logger.debug("Sniffing mapping for [{}]", typeNames);
|
||||
|
||||
for (final String type : typeNames) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
Engine.Searcher searcher = searcher();
|
||||
try {
|
||||
Query query = new ConstantScoreQuery(filterCache.cache(new TermFilter(new Term(TypeFieldMapper.NAME, type))));
|
||||
long typeCount = Lucene.count(searcher().searcher(), query, -1);
|
||||
|
||||
int marker = (int) (typeCount / mappingSnifferDocs);
|
||||
if (marker == 0) {
|
||||
marker = 1;
|
||||
}
|
||||
final int fMarker = marker;
|
||||
searcher.searcher().search(query, new Collector() {
|
||||
|
||||
private final FieldSelector fieldSelector = new UidAndSourceFieldSelector();
|
||||
private int counter = 0;
|
||||
private IndexReader reader;
|
||||
|
||||
@Override public void setScorer(Scorer scorer) throws IOException {
|
||||
}
|
||||
|
||||
@Override public void collect(int doc) throws IOException {
|
||||
if (state == IndexShardState.CLOSED) {
|
||||
throw new IOException("CLOSED");
|
||||
}
|
||||
if (++counter == fMarker) {
|
||||
counter = 0;
|
||||
|
||||
Document document = reader.document(doc, fieldSelector);
|
||||
Uid uid = Uid.createUid(document.get(UidFieldMapper.NAME));
|
||||
String source = document.get(SourceFieldMapper.NAME);
|
||||
|
||||
mapperService.type(uid.type()).parse(uid.type(), uid.id(), source);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void setNextReader(IndexReader reader, int docBase) throws IOException {
|
||||
this.reader = reader;
|
||||
}
|
||||
|
||||
@Override public boolean acceptsDocsOutOfOrder() {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
if (e.getMessage().equals("CLOSED")) {
|
||||
// ignore, we got closed
|
||||
} else {
|
||||
logger.warn("Failed to sniff mapping for type [" + type + "]", e);
|
||||
}
|
||||
} finally {
|
||||
searcher.release();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (e.getMessage().equals("CLOSED")) {
|
||||
// ignore, we got closed
|
||||
} else {
|
||||
logger.warn("Failed to sniff mapping", e);
|
||||
}
|
||||
} finally {
|
||||
if (termEnum != null) {
|
||||
try {
|
||||
termEnum.close();
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
searcher.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.search.fetch;
|
|||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.FieldSelector;
|
||||
import org.apache.lucene.document.Fieldable;
|
||||
import org.elasticsearch.index.mapper.*;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
|
@ -47,7 +48,7 @@ public class FetchPhase implements SearchPhase {
|
|||
}
|
||||
|
||||
public void execute(SearchContext context) {
|
||||
FieldMappersFieldSelector fieldSelector = buildFieldSelectors(context);
|
||||
FieldSelector fieldSelector = buildFieldSelectors(context);
|
||||
|
||||
SearchHit[] hits = new SearchHit[context.docIdsToLoad().length];
|
||||
int index = 0;
|
||||
|
@ -137,7 +138,7 @@ public class FetchPhase implements SearchPhase {
|
|||
return uid;
|
||||
}
|
||||
|
||||
private Document loadDocument(SearchContext context, FieldMappersFieldSelector fieldSelector, int docId) {
|
||||
private Document loadDocument(SearchContext context, FieldSelector fieldSelector, int docId) {
|
||||
Document doc;
|
||||
try {
|
||||
doc = context.searcher().doc(docId, fieldSelector);
|
||||
|
@ -147,20 +148,19 @@ public class FetchPhase implements SearchPhase {
|
|||
return doc;
|
||||
}
|
||||
|
||||
private FieldMappersFieldSelector buildFieldSelectors(SearchContext context) {
|
||||
FieldMappersFieldSelector fieldSelector = new FieldMappersFieldSelector();
|
||||
if (context.fieldNames() != null) {
|
||||
for (String fieldName : context.fieldNames()) {
|
||||
FieldMappers x = context.mapperService().smartNameFieldMappers(fieldName);
|
||||
if (x == null) {
|
||||
throw new FetchPhaseExecutionException(context, "No mapping for field [" + fieldName + "]");
|
||||
}
|
||||
fieldSelector.add(x);
|
||||
}
|
||||
} else {
|
||||
fieldSelector.add(context.mapperService().sourceFieldMappers());
|
||||
private FieldSelector buildFieldSelectors(SearchContext context) {
|
||||
if (context.fieldNames() == null || context.fieldNames().length == 0) {
|
||||
return new UidAndSourceFieldSelector();
|
||||
}
|
||||
|
||||
FieldMappersFieldSelector fieldSelector = new FieldMappersFieldSelector();
|
||||
for (String fieldName : context.fieldNames()) {
|
||||
FieldMappers x = context.mapperService().smartNameFieldMappers(fieldName);
|
||||
if (x == null) {
|
||||
throw new FetchPhaseExecutionException(context, "No mapping for field [" + fieldName + "]");
|
||||
}
|
||||
fieldSelector.add(x);
|
||||
}
|
||||
// add the uids by default, so we can return the id/type
|
||||
fieldSelector.add(context.mapperService().uidFieldMappers());
|
||||
return fieldSelector;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue