imrpove caching and fix delete mapping logic

This commit is contained in:
kimchy 2010-09-17 01:25:37 +02:00
parent 08d7125cd5
commit ff081240eb
12 changed files with 100 additions and 31 deletions

View File

@ -22,6 +22,8 @@ package org.elasticsearch.action.admin.indices.mapping.delete;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
@ -52,11 +54,15 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
private final TransportDeleteByQueryAction deleteByQueryAction;
private final TransportRefreshAction refreshAction;
@Inject public TransportDeleteMappingAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataMappingService metaDataMappingService, TransportDeleteByQueryAction deleteByQueryAction) {
ThreadPool threadPool, MetaDataMappingService metaDataMappingService,
TransportDeleteByQueryAction deleteByQueryAction, TransportRefreshAction refreshAction) {
super(settings, transportService, clusterService, threadPool);
this.metaDataMappingService = metaDataMappingService;
this.deleteByQueryAction = deleteByQueryAction;
this.refreshAction = refreshAction;
}
@ -87,10 +93,19 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
final CountDownLatch latch = new CountDownLatch(1);
deleteByQueryAction.execute(Requests.deleteByQueryRequest(request.indices()).query(QueryBuilders.filtered(QueryBuilders.matchAllQuery(), FilterBuilders.termFilter(TypeFieldMapper.NAME, request.type()))), new ActionListener<DeleteByQueryResponse>() {
@Override public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
refreshAction.execute(Requests.refreshRequest(request.indices()), new ActionListener<RefreshResponse>() {
@Override public void onResponse(RefreshResponse refreshResponse) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()));
latch.countDown();
}
@Override public void onFailure(Throwable e) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()));
latch.countDown();
}
});
}
@Override public void onFailure(Throwable e) {
failureRef.set(e);
latch.countDown();

View File

@ -43,4 +43,22 @@ public class Tuple<V1, V2> {
public V2 v2() {
return v2;
}
@Override public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Tuple tuple = (Tuple) o;
if (v1 != null ? !v1.equals(tuple.v1) : tuple.v1 != null) return false;
if (v2 != null ? !v2.equals(tuple.v2) : tuple.v2 != null) return false;
return true;
}
@Override public int hashCode() {
int result = v1 != null ? v1.hashCode() : 0;
result = 31 * result + (v2 != null ? v2.hashCode() : 0);
return result;
}
}

View File

@ -35,9 +35,11 @@ import java.util.concurrent.ConcurrentMap;
public class SoftFieldDataCache extends AbstractConcurrentMapFieldDataCache {
@Inject public SoftFieldDataCache(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings, new MapMaker()
.softKeys()
.<Object, ConcurrentMap<String, FieldData>>makeMap());
super(index, indexSettings);
}
@Override protected ConcurrentMap<String, FieldData> buildFieldDataMap() {
return new MapMaker().softValues().makeMap();
}
@Override public String type() {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.cache.field.data.support;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.collect.MapMaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.AbstractIndexComponent;
@ -41,10 +42,11 @@ public abstract class AbstractConcurrentMapFieldDataCache extends AbstractIndexC
private final Object creationMutex = new Object();
protected AbstractConcurrentMapFieldDataCache(Index index, @IndexSettings Settings indexSettings,
ConcurrentMap<Object, ConcurrentMap<String, FieldData>> cache) {
protected AbstractConcurrentMapFieldDataCache(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings);
this.cache = cache;
// weak keys is fine, it will only be cleared once IndexReader references will be removed
// (assuming clear(...) will not be called)
this.cache = new MapMaker().weakKeys().makeMap();
}
@Override public void close() throws ElasticSearchException {
@ -56,7 +58,11 @@ public abstract class AbstractConcurrentMapFieldDataCache extends AbstractIndexC
}
@Override public void clear(IndexReader reader) {
cache.remove(reader.getFieldCacheKey());
ConcurrentMap<String, FieldData> map = cache.remove(reader.getFieldCacheKey());
// help soft/weak handling GC
if (map != null) {
map.clear();
}
}
@Override public void clearUnreferenced() {
@ -67,7 +73,7 @@ public abstract class AbstractConcurrentMapFieldDataCache extends AbstractIndexC
return cache(type.fieldDataClass(), reader, fieldName);
}
protected ConcurrentMap<String, FieldData> buildFilterMap() {
protected ConcurrentMap<String, FieldData> buildFieldDataMap() {
return ConcurrentCollections.newConcurrentMap();
}
@ -77,7 +83,7 @@ public abstract class AbstractConcurrentMapFieldDataCache extends AbstractIndexC
synchronized (creationMutex) {
fieldDataCache = cache.get(reader.getFieldCacheKey());
if (fieldDataCache == null) {
fieldDataCache = buildFilterMap();
fieldDataCache = buildFieldDataMap();
cache.put(reader.getFieldCacheKey(), fieldDataCache);
}
}

View File

@ -35,9 +35,11 @@ import java.util.concurrent.ConcurrentMap;
public class WeakFieldDataCache extends AbstractConcurrentMapFieldDataCache {
@Inject public WeakFieldDataCache(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings, new MapMaker()
.weakKeys()
.<Object, ConcurrentMap<String, FieldData>>makeMap());
super(index, indexSettings);
}
@Override protected ConcurrentMap<String, FieldData> buildFieldDataMap() {
return new MapMaker().weakValues().makeMap();
}
@Override public String type() {

View File

@ -22,7 +22,7 @@ package org.elasticsearch.index.cache.filter;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Scopes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.cache.filter.weak.WeakFilterCache;
import org.elasticsearch.index.cache.filter.soft.SoftFilterCache;
/**
* @author kimchy (Shay Banon)
@ -41,7 +41,7 @@ public class FilterCacheModule extends AbstractModule {
@Override protected void configure() {
bind(FilterCache.class)
.to(settings.getAsClass(FilterCacheSettings.FILTER_CACHE_TYPE, WeakFilterCache.class, "org.elasticsearch.index.cache.filter.", "FilterCache"))
.to(settings.getAsClass(FilterCacheSettings.FILTER_CACHE_TYPE, SoftFilterCache.class, "org.elasticsearch.index.cache.filter.", "FilterCache"))
.in(Scopes.SINGLETON);
}
}

View File

@ -38,7 +38,13 @@ import java.util.concurrent.ConcurrentMap;
public class SoftFilterCache extends AbstractConcurrentMapFilterCache {
@Inject public SoftFilterCache(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings, new MapMaker().softKeys().<Object, ConcurrentMap<Filter, DocSet>>makeMap());
super(index, indexSettings);
}
@Override protected ConcurrentMap<Filter, DocSet> buildFilterMap() {
// DocSet are not really stored with strong reference only when searching on them...
// Filter might be stored in query cache
return new MapMaker().softValues().makeMap();
}
@Override public String type() {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.cache.filter.support;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.Filter;
import org.elasticsearch.common.collect.MapMaker;
import org.elasticsearch.common.lucene.docset.DocSet;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.AbstractIndexComponent;
@ -44,10 +45,11 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
final ConcurrentMap<Object, ConcurrentMap<Filter, DocSet>> cache;
protected AbstractConcurrentMapFilterCache(Index index, @IndexSettings Settings indexSettings,
ConcurrentMap<Object, ConcurrentMap<Filter, DocSet>> cache) {
protected AbstractConcurrentMapFilterCache(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings);
this.cache = cache;
// weak keys is fine, it will only be cleared once IndexReader references will be removed
// (assuming clear(...) will not be called)
this.cache = new MapMaker().weakKeys().makeMap();
}
@Override public void close() {
@ -59,7 +61,11 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
}
@Override public void clear(IndexReader reader) {
cache.remove(reader.getFieldCacheKey());
ConcurrentMap<Filter, DocSet> map = cache.remove(reader.getFieldCacheKey());
// help soft/weak handling GC
if (map != null) {
map.clear();
}
}
@Override public void clearUnreferenced() {

View File

@ -38,7 +38,13 @@ import java.util.concurrent.ConcurrentMap;
public class WeakFilterCache extends AbstractConcurrentMapFilterCache {
@Inject public WeakFilterCache(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings, new MapMaker().weakKeys().<Object, ConcurrentMap<Filter, DocSet>>makeMap());
super(index, indexSettings);
}
@Override protected ConcurrentMap<Filter, DocSet> buildFilterMap() {
// DocSet are not really stored with strong reference only when searching on them...
// Filter might be stored in query cache
return new MapMaker().weakValues().makeMap();
}
@Override public String type() {

View File

@ -35,11 +35,13 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexShardAlreadyExistsException;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
@ -58,6 +60,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import static org.elasticsearch.ExceptionsHelper.*;
import static org.elasticsearch.common.collect.Sets.*;
@ -83,6 +86,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final NodeMappingCreatedAction nodeMappingCreatedAction;
// a map of mappings type we have seen per index
private final ConcurrentMap<Tuple<String, String>, Boolean> seenMappings = ConcurrentCollections.newConcurrentMap();
@Inject public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
ThreadPool threadPool, RecoveryTarget recoveryTarget, ShardStateAction shardStateAction,
NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
@ -199,6 +205,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
for (Map.Entry<String, CompressedString> entry : mappings.entrySet()) {
String mappingType = entry.getKey();
CompressedString mappingSource = entry.getValue();
if (!seenMappings.containsKey(new Tuple<String, String>(index, mappingType))) {
seenMappings.put(new Tuple<String, String>(index, mappingType), true);
}
try {
if (!mapperService.hasMapping(mappingType)) {
@ -224,8 +233,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
// go over and remove mappings
for (DocumentMapper documentMapper : mapperService) {
if (!mappings.containsKey(documentMapper.type())) {
// we have it in our mappings, but not in the metadata, remove it
if (seenMappings.containsKey(new Tuple<String, String>(index, documentMapper.type())) && !mappings.containsKey(documentMapper.type())) {
// we have it in our mappings, but not in the metadata, and we have seen it in the cluster state, remove it
mapperService.remove(documentMapper.type());
}
}

View File

@ -54,8 +54,6 @@ import org.elasticsearch.gateway.GatewayModule;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.http.HttpServerModule;
import org.elasticsearch.indexer.IndexerManager;
import org.elasticsearch.indexer.IndexersModule;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
@ -134,7 +132,7 @@ public final class InternalNode implements Node {
if (settings.getAsBoolean("http.enabled", true)) {
modules.add(new HttpServerModule(settings));
}
modules.add(new IndexersModule(settings));
// modules.add(new IndexersModule(settings));
modules.add(new IndicesModule(settings));
modules.add(new SearchModule());
modules.add(new TransportActionModule());
@ -171,7 +169,7 @@ public final class InternalNode implements Node {
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(IndexerManager.class).start();
// injector.getInstance(IndexerManager.class).start();
injector.getInstance(ClusterService.class).start();
injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start();
@ -209,7 +207,7 @@ public final class InternalNode implements Node {
injector.getInstance(MonitorService.class).stop();
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(IndexerManager.class).stop();
// injector.getInstance(IndexerManager.class).stop();
injector.getInstance(IndicesClusterStateService.class).stop();
injector.getInstance(IndicesService.class).stop();
injector.getInstance(RestController.class).stop();
@ -256,7 +254,7 @@ public final class InternalNode implements Node {
stopWatch.stop().start("search");
injector.getInstance(SearchService.class).close();
stopWatch.stop().start("indexers");
injector.getInstance(IndexerManager.class).close();
// injector.getInstance(IndexerManager.class).close();
stopWatch.stop().start("indices_cluster");
injector.getInstance(IndicesClusterStateService.class).close();
stopWatch.stop().start("indices");

View File

@ -122,6 +122,7 @@ public class SimpleFacetsTests extends AbstractNodesTests {
@Test public void testTermsIndexFacet() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
client.admin().indices().prepareDelete("test1").execute().actionGet();
client.admin().indices().prepareDelete("test2").execute().actionGet();
} catch (Exception e) {