Already loaded SimpleIdReaderCache should be reloaded when a new `_parent` has been introduced.

Closes #4595
Relates #4568
This commit is contained in:
Martijn van Groningen 2013-12-31 00:44:45 +01:00
parent fbae6e940b
commit 38f038f899
7 changed files with 179 additions and 32 deletions

View File

@ -34,6 +34,8 @@ import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.id.IdCache;
import org.elasticsearch.index.cache.id.IdReaderCache;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentTypeListener;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
@ -45,31 +47,36 @@ import org.elasticsearch.index.shard.service.IndexShard;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
public class SimpleIdCache extends AbstractIndexComponent implements IdCache, SegmentReader.CoreClosedListener {
public class SimpleIdCache extends AbstractIndexComponent implements IdCache, SegmentReader.CoreClosedListener, DocumentTypeListener {
private final ConcurrentMap<Object, SimpleIdReaderCache> idReaders;
private final boolean reuse;
private final ConcurrentMap<Object, SimpleIdReaderCache> idReaders;
private final AtomicReference<NavigableSet<HashedBytesArray>> parentTypesHolder;
IndexService indexService;
@Inject
public SimpleIdCache(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings);
reuse = componentSettings.getAsBoolean("reuse", false);
idReaders = ConcurrentCollections.newConcurrentMap();
this.reuse = componentSettings.getAsBoolean("reuse", false);
parentTypesHolder = new AtomicReference<NavigableSet<HashedBytesArray>>(new TreeSet<HashedBytesArray>(UTF8SortedAsUnicodeComparator.utf8SortedAsUnicodeSortOrder));
}
@Override
public void setIndexService(IndexService indexService) {
this.indexService = indexService;
indexService.mapperService().addTypeListener(this);
}
@Override
public void close() throws ElasticSearchException {
indexService.mapperService().removeTypeListener(this);
clear();
}
@ -117,21 +124,12 @@ public class SimpleIdCache extends AbstractIndexComponent implements IdCache, Se
// do the refresh
Map<Object, Map<String, TypeBuilder>> builders = new HashMap<Object, Map<String, TypeBuilder>>();
Map<Object, IndexReader> cacheToReader = new HashMap<Object, IndexReader>();
// We don't want to load uid of child documents, this allows us to not load uids of child types.
NavigableSet<HashedBytesArray> parentTypes = new TreeSet<HashedBytesArray>(UTF8SortedAsUnicodeComparator.utf8SortedAsUnicodeSortOrder);
BytesRef spare = new BytesRef();
for (String type : indexService.mapperService().types()) {
ParentFieldMapper parentFieldMapper = indexService.mapperService().documentMapper(type).parentFieldMapper();
if (parentFieldMapper.active()) {
parentTypes.add(new HashedBytesArray(Strings.toUTF8Bytes(parentFieldMapper.type(), spare)));
}
}
NavigableSet<HashedBytesArray> parentTypes = this.parentTypesHolder.get();
// first, go over and load all the id->doc map for all types
for (AtomicReaderContext context : atomicReaderContexts) {
AtomicReader reader = context.reader();
if (idReaders.containsKey(reader.getCoreCacheKey())) {
if (!refreshNeeded(context)) {
// no need, continue
continue;
}
@ -150,6 +148,7 @@ public class SimpleIdCache extends AbstractIndexComponent implements IdCache, Se
DocsEnum docsEnum = null;
uid: for (BytesRef term = termsEnum.next(); term != null; term = termsEnum.next()) {
HashedBytesArray[] typeAndId = Uid.splitUidIntoTypeAndId(term);
// We don't want to load uid of child documents, this allows us to not load uids of child types.
if (!parentTypes.contains(typeAndId[0])) {
do {
HashedBytesArray nextParent = parentTypes.ceiling(typeAndId[0]);
@ -189,10 +188,9 @@ public class SimpleIdCache extends AbstractIndexComponent implements IdCache, Se
}
// now, go and load the docId->parentId map
for (AtomicReaderContext context : atomicReaderContexts) {
AtomicReader reader = context.reader();
if (idReaders.containsKey(reader.getCoreCacheKey())) {
if (!refreshNeeded(context)) {
// no need, continue
continue;
}
@ -295,13 +293,41 @@ public class SimpleIdCache extends AbstractIndexComponent implements IdCache, Se
private boolean refreshNeeded(List<AtomicReaderContext> atomicReaderContexts) {
for (AtomicReaderContext atomicReaderContext : atomicReaderContexts) {
if (!idReaders.containsKey(atomicReaderContext.reader().getCoreCacheKey())) {
if (refreshNeeded(atomicReaderContext)) {
return true;
}
}
return false;
}
private boolean refreshNeeded(AtomicReaderContext atomicReaderContext) {
return !idReaders.containsKey(atomicReaderContext.reader().getCoreCacheKey());
}
@Override
public void beforeCreate(DocumentMapper mapper) {
NavigableSet<HashedBytesArray> parentTypes = parentTypesHolder.get();
ParentFieldMapper parentFieldMapper = mapper.parentFieldMapper();
if (parentFieldMapper.active()) {
// A _parent field can never be added to an existing mapping, so a _parent field either exists on
// a new created or doesn't exists. This is why we can update the known parent types via DocumentTypeListener
if (parentTypes.add(new HashedBytesArray(Strings.toUTF8Bytes(parentFieldMapper.type(), new BytesRef())))) {
parentTypesHolder.set(parentTypes);
clear();
}
}
}
@Override
public void afterRemove(DocumentMapper mapper) {
NavigableSet<HashedBytesArray> parentTypes = parentTypesHolder.get();
ParentFieldMapper parentFieldMapper = mapper.parentFieldMapper();
if (parentFieldMapper.active()) {
parentTypes.remove(new HashedBytesArray(Strings.toUTF8Bytes(parentFieldMapper.type(), new BytesRef())));
parentTypesHolder.set(parentTypes);
}
}
static class TypeBuilder {
final ObjectIntOpenHashMap<HashedBytesArray> idToDoc = new ObjectIntOpenHashMap<HashedBytesArray>();
final HashedBytesArray[] docToId;

View File

@ -26,15 +26,15 @@ public interface DocumentTypeListener {
/**
* Invoked just before a new document type has been created.
*
* @param type The new document type
* @param mapper The new document mapper of the type being added
*/
void beforeCreate(String type);
void beforeCreate(DocumentMapper mapper);
/**
* Invoked just after an existing document type has been removed.
*
* @param type The existing document type
* @param mapper The existing document mapper of the type being removed
*/
void afterRemove(String type);
void afterRemove(DocumentMapper mapper);
}

View File

@ -295,7 +295,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
mapper.addObjectMapperListener(objectMapperListener, false);
for (DocumentTypeListener typeListener : typeListeners) {
typeListener.beforeCreate(mapper.type());
typeListener.beforeCreate(mapper);
}
mappers = newMapBuilder(mappers).put(mapper.type(), mapper).map();
return mapper;
@ -339,7 +339,7 @@ public class MapperService extends AbstractIndexComponent implements Iterable<Do
mappers = newMapBuilder(mappers).remove(type).map();
removeObjectAndFieldMappers(docMapper);
for (DocumentTypeListener typeListener : typeListeners) {
typeListener.afterRemove(type);
typeListener.afterRemove(docMapper);
}
}
}

View File

@ -20,6 +20,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.indexing.IndexingOperationListener;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentTypeListener;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.internal.TypeFieldMapper;
@ -191,15 +192,15 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
private class PercolateTypeListener implements DocumentTypeListener {
@Override
public void beforeCreate(String type) {
if (PercolatorService.TYPE_NAME.equals(type)) {
public void beforeCreate(DocumentMapper mapper) {
if (PercolatorService.TYPE_NAME.equals(mapper.type())) {
enableRealTimePercolator();
}
}
@Override
public void afterRemove(String type) {
if (PercolatorService.TYPE_NAME.equals(type)) {
public void afterRemove(DocumentMapper mapper) {
if (PercolatorService.TYPE_NAME.equals(mapper.type())) {
disableRealTimePercolator();
clear();
}

View File

@ -263,6 +263,7 @@ public class SimpleIdCacheTests extends ElasticsearchTestCase {
Index index = new Index("test");
SimpleIdCache idCache = new SimpleIdCache(index, settings);
MapperService mapperService = MapperTestUtils.newMapperService();
idCache.setIndexService(new StubIndexService(mapperService));
for (Tuple<String, String> documentType : documentTypes) {
String defaultMapping = XContentFactory.jsonBuilder().startObject().startObject(documentType.v1())
@ -271,7 +272,6 @@ public class SimpleIdCacheTests extends ElasticsearchTestCase {
mapperService.merge(documentType.v1(), new CompressedString(defaultMapping), true);
}
idCache.setIndexService(new StubIndexService(mapperService));
return idCache;
}

View File

@ -328,11 +328,12 @@ public class ChildrenConstantScoreQueryTests extends ElasticsearchLuceneTestCase
final CacheRecycler cacheRecycler = new CacheRecycler(ImmutableSettings.EMPTY);
Settings settings = ImmutableSettings.EMPTY;
MapperService mapperService = MapperTestUtils.newMapperService(index, settings);
final IndexService indexService = new SimpleIdCacheTests.StubIndexService(mapperService);
idCache.setIndexService(indexService);
// Id_cache is now registered as document type listener, so we can add mappings.
mapperService.merge(
childType, new CompressedString(PutMappingRequest.buildFromSimplifiedDef(childType, "_parent", "type=" + parentType).string()), true
);
final IndexService indexService = new SimpleIdCacheTests.StubIndexService(mapperService);
idCache.setIndexService(indexService);
ThreadPool threadPool = new ThreadPool();
NodeSettingsService nodeSettingsService = new NodeSettingsService(settings);

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.explain.ExplainResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
@ -36,6 +37,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.mapper.MergeMappingException;
import org.elasticsearch.index.query.*;
import org.elasticsearch.index.search.child.ScoreType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.facet.terms.TermsFacet;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
@ -54,8 +56,7 @@ import static org.elasticsearch.index.query.FilterBuilders.*;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders.scriptFunction;
import static org.elasticsearch.search.facet.FacetBuilders.termsFacet;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.*;
/**
@ -2052,6 +2053,124 @@ public class SimpleChildQuerySearchTests extends ElasticsearchIntegrationTest {
assertThat(searchResponse.getHits().getAt(0).getMatchedQueries()[0], equalTo("test"));
}
@Test
public void testParentChildQueriesNoParentType() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.refresh_interval", -1)
.put("index.number_of_replicas", 0))
.execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
String parentId = "p1";
client().prepareIndex("test", "parent", parentId).setSource("p_field", "1").execute().actionGet();
client().admin().indices().prepareRefresh().get();
try {
client().prepareSearch("test")
.setQuery(hasChildQuery("child", termQuery("c_field", "1")))
.execute().actionGet();
fail();
} catch (SearchPhaseExecutionException e) {
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
}
try {
client().prepareSearch("test")
.setQuery(hasChildQuery("child", termQuery("c_field", "1")).scoreType("max"))
.execute().actionGet();
fail();
} catch (SearchPhaseExecutionException e) {
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
}
try {
client().prepareSearch("test")
.setPostFilter(hasChildFilter("child", termQuery("c_field", "1")))
.execute().actionGet();
fail();
} catch (SearchPhaseExecutionException e) {
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
}
try {
client().prepareSearch("test")
.setQuery(topChildrenQuery("child", termQuery("c_field", "1")).score("max"))
.execute().actionGet();
fail();
} catch (SearchPhaseExecutionException e) {
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
}
// can't fail, because there is no check, this b/c parent type can be refered by many child types.
client().prepareSearch("test")
.setQuery(hasParentQuery("parent", termQuery("p_field", "1")).scoreType("score"))
.execute().actionGet();
client().prepareSearch("test")
.setPostFilter(hasParentFilter("parent", termQuery("p_field", "1")))
.execute().actionGet();
}
@Test
public void testAdd_ParentFieldAfterIndexingParentDocButBeforeIndexingChildDoc() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.refresh_interval", -1)
.put("index.number_of_replicas", 0))
.execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
String parentId = "p1";
client().prepareIndex("test", "parent", parentId).setSource("p_field", "1").execute().actionGet();
client().admin().indices().prepareRefresh().get();
assertAcked(client().admin()
.indices()
.preparePutMapping("test")
.setType("child")
.setSource("_parent", "type=parent"));
client().prepareIndex("test", "child", "c1").setSource("c_field", "1").setParent(parentId).execute().actionGet();
client().admin().indices().prepareRefresh().get();
SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(hasChildQuery("child", termQuery("c_field", "1")))
.execute().actionGet();
assertHitCount(searchResponse, 1l);
assertSearchHits(searchResponse, parentId);
searchResponse = client().prepareSearch("test")
.setQuery(hasChildQuery("child", termQuery("c_field", "1")).scoreType("max"))
.execute().actionGet();
assertHitCount(searchResponse, 1l);
assertSearchHits(searchResponse, parentId);
searchResponse = client().prepareSearch("test")
.setPostFilter(hasChildFilter("child", termQuery("c_field", "1")))
.execute().actionGet();
assertHitCount(searchResponse, 1l);
assertSearchHits(searchResponse, parentId);
searchResponse = client().prepareSearch("test")
.setQuery(topChildrenQuery("child", termQuery("c_field", "1")).score("max"))
.execute().actionGet();
assertHitCount(searchResponse, 1l);
assertSearchHits(searchResponse, parentId);
searchResponse = client().prepareSearch("test")
.setPostFilter(hasParentFilter("parent", termQuery("p_field", "1")))
.execute().actionGet();
assertHitCount(searchResponse, 1l);
assertSearchHits(searchResponse, "c1");
searchResponse = client().prepareSearch("test")
.setQuery(hasParentQuery("parent", termQuery("p_field", "1")).scoreType("score"))
.execute().actionGet();
assertHitCount(searchResponse, 1l);
assertSearchHits(searchResponse, "c1");
}
private static HasChildFilterBuilder hasChildFilter(String type, QueryBuilder queryBuilder) {
HasChildFilterBuilder hasChildFilterBuilder = FilterBuilders.hasChildFilter(type, queryBuilder);
hasChildFilterBuilder.setShortCircuitCutoff(randomInt(10));