clean search context when a shard/index gets closed. Add an IndicesLifecyle global component that allows to register for such events easily.

This commit is contained in:
kimchy 2010-04-07 21:22:43 +03:00
parent f631e9aded
commit a208eb5a50
8 changed files with 272 additions and 5 deletions

View File

@ -281,7 +281,10 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad) {
for (Map.Entry<SearchShardTarget, QuerySearchResultProvider> entry : queryResults.entrySet()) {
if (!docIdsToLoad.containsKey(entry.getKey())) {
searchService.sendFreeContext(nodes.get(entry.getKey().nodeId()), entry.getValue().id());
Node node = nodes.get(entry.getKey().nodeId());
if (node != null) { // should not happen (==null) but safeguard anyhow
searchService.sendFreeContext(node, entry.getValue().id());
}
}
}
}

View File

@ -50,6 +50,8 @@ import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreModule;
import org.elasticsearch.index.translog.TranslogModule;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ShardsPluginsModule;
import org.elasticsearch.util.component.CloseableIndexComponent;
@ -65,7 +67,7 @@ import static com.google.common.collect.Sets.*;
import static org.elasticsearch.util.MapBuilder.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class InternalIndexService extends AbstractIndexComponent implements IndexService {
@ -75,6 +77,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private final PluginsService pluginsService;
private final InternalIndicesLifecycle indicesLifecycle;
private final MapperService mapperService;
private final IndexQueryParserService queryParserService;
@ -102,6 +106,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
this.operationRouting = operationRouting;
this.pluginsService = injector.getInstance(PluginsService.class);
this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class);
}
@Override public int numberOfShards() {
@ -180,6 +185,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
throw new IndexShardAlreadyExistsException(shardId + " already exists");
}
indicesLifecycle.beforeIndexShardCreated(shardId);
logger.debug("Creating Shard Id [{}]", shardId.id());
Injector shardInjector = injector.createChildInjector(
@ -205,6 +212,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
logger.warn("Failed to clean store on shard creation", e);
}
indicesLifecycle.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
return indexShard;
@ -232,6 +241,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
IndexShard indexShard = tmpShardsMap.remove(shardId);
shards = ImmutableMap.copyOf(tmpShardsMap);
indicesLifecycle.beforeIndexShardClosed(indexShard, delete);
for (Class<? extends CloseableIndexComponent> closeable : pluginsService.shardServices()) {
shardInjector.getInstance(closeable).close(delete);
@ -263,6 +273,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
}
Injectors.close(injector);
indicesLifecycle.afterIndexShardClosed(indexShard.shardId(), delete);
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
/**
* A global component allowing to register for lifecycle of an index (create/closed) and
* an index shard (created/closed).
*
* @author kimchy (shay.banon)
*/
public interface IndicesLifecycle {
/**
* Add a listener.
*/
void addListener(Listener listener);
/**
* Remove a listener.
*/
void removeListener(Listener listener);
/**
* A listener for index and index shard lifecycle events (create/closed).
*/
public abstract static class Listener {
/**
* Called before the index gets created.
*/
public void beforeIndexCreated(Index index) {
}
/**
* Called after the index has been created.
*/
public void afterIndexCreated(IndexService indexService) {
}
/**
* Called before the index shard gets created.
*/
public void beforeIndexShardCreated(ShardId shardId) {
}
/**
* Called after the index shard has been created.
*/
public void afterIndexShardCreated(IndexShard indexShard) {
}
/**
* Called before the index get closed.
*
* @param indexService The index service
* @param delete Does the index gets closed because of a delete command, or because the node is shutting down
*/
public void beforeIndexClosed(IndexService indexService, boolean delete) {
}
/**
* Called after the index has been closed.
*
* @param index The index
* @param delete Does the index gets closed because of a delete command, or because the node is shutting down
*/
public void afterIndexClosed(Index index, boolean delete) {
}
/**
* Called before the index shard gets closed.
*
* @param indexShard The index shard
* @param delete Does the index shard gets closed because of a delete command, or because the node is shutting down
*/
public void beforeIndexShardClosed(IndexShard indexShard, boolean delete) {
}
/**
* Called after the index shard has been closed.
*
* @param shardId The shard id
* @param delete Does the index shard gets closed because of a delete command, or because the node is shutting down
*/
public void afterIndexShardClosed(ShardId shardId, boolean delete) {
}
}
}

View File

@ -35,6 +35,7 @@ public class IndicesModule extends AbstractModule {
}
@Override protected void configure() {
bind(IndicesLifecycle.class).to(InternalIndicesLifecycle.class).asEagerSingleton();
bind(IndicesService.class).to(InternalIndicesService.class).asEagerSingleton();
bind(IndicesClusterStateService.class).asEagerSingleton();
bind(IndicesMemoryCleaner.class).asEagerSingleton();

View File

@ -42,6 +42,8 @@ public interface IndicesService extends Iterable<IndexService>, LifecycleCompone
boolean hasIndex(String index);
IndicesLifecycle indicesLifecycle();
Set<String> indices();
IndexService indexService(String index);

View File

@ -0,0 +1,98 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
import com.google.inject.Inject;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.settings.Settings;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @author kimchy (shay.banon)
*/
public class InternalIndicesLifecycle extends AbstractComponent implements IndicesLifecycle {
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();
@Inject public InternalIndicesLifecycle(Settings settings) {
super(settings);
}
public void addListener(Listener listener) {
listeners.add(listener);
}
public void removeListener(Listener listener) {
listeners.remove(listener);
}
public void beforeIndexCreated(Index index) {
for (Listener listener : listeners) {
listener.beforeIndexCreated(index);
}
}
public void afterIndexCreated(IndexService indexService) {
for (Listener listener : listeners) {
listener.afterIndexCreated(indexService);
}
}
public void beforeIndexShardCreated(ShardId shardId) {
for (Listener listener : listeners) {
listener.beforeIndexShardCreated(shardId);
}
}
public void afterIndexShardCreated(IndexShard indexShard) {
for (Listener listener : listeners) {
listener.afterIndexShardCreated(indexShard);
}
}
public void beforeIndexClosed(IndexService indexService, boolean delete) {
for (Listener listener : listeners) {
listener.beforeIndexClosed(indexService, delete);
}
}
public void afterIndexClosed(Index index, boolean delete) {
for (Listener listener : listeners) {
listener.afterIndexClosed(index, delete);
}
}
public void beforeIndexShardClosed(IndexShard indexShard, boolean delete) {
for (Listener listener : listeners) {
listener.beforeIndexShardClosed(indexShard, delete);
}
}
public void afterIndexShardClosed(ShardId shardId, boolean delete) {
for (Listener listener : listeners) {
listener.afterIndexShardClosed(shardId, delete);
}
}
}

View File

@ -67,6 +67,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
private final IndicesClusterStateService clusterStateService;
private final InternalIndicesLifecycle indicesLifecycle;
private final Injector injector;
private final PluginsService pluginsService;
@ -75,9 +77,10 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
private volatile ImmutableMap<String, IndexService> indices = ImmutableMap.of();
@Inject public InternalIndicesService(Settings settings, IndicesClusterStateService clusterStateService, Injector injector) {
@Inject public InternalIndicesService(Settings settings, IndicesClusterStateService clusterStateService, IndicesLifecycle indicesLifecycle, Injector injector) {
super(settings);
this.clusterStateService = clusterStateService;
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.injector = injector;
this.pluginsService = injector.getInstance(PluginsService.class);
@ -98,6 +101,10 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
clusterStateService.close();
}
@Override public IndicesLifecycle indicesLifecycle() {
return this.indicesLifecycle;
}
/**
* Returns <tt>true</tt> if changes (adding / removing) indices, shards and so on are allowed.
*/
@ -148,6 +155,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
throw new IndexAlreadyExistsException(index);
}
indicesLifecycle.beforeIndexCreated(index);
logger.debug("Creating Index [{}], shards [{}]/[{}]", new Object[]{sIndexName, settings.get(SETTING_NUMBER_OF_SHARDS), settings.get(SETTING_NUMBER_OF_REPLICAS)});
Settings indexSettings = settingsBuilder()
@ -176,6 +185,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
IndexService indexService = indexInjector.getInstance(IndexService.class);
indicesLifecycle.afterIndexCreated(indexService);
indices = newMapBuilder(indices).put(index.name(), indexService).immutableMap();
return indexService;
@ -201,6 +212,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
IndexService indexService = tmpMap.remove(index);
indices = ImmutableMap.copyOf(tmpMap);
indicesLifecycle.beforeIndexClosed(indexService, delete);
for (Class<? extends CloseableIndexComponent> closeable : pluginsService.indexServices()) {
indexInjector.getInstance(closeable).close(delete);
}
@ -214,5 +227,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
indexInjector.getInstance(IndexGateway.class).close(delete);
Injectors.close(injector);
indicesLifecycle.afterIndexClosed(indexService.index(), delete);
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.dfs.CachedDfSource;
import org.elasticsearch.search.dfs.DfsPhase;
@ -91,6 +92,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final AtomicLong idGenerator = new AtomicLong();
private final CleanContextOnIndicesLifecycleListener indicesLifecycleListener = new CleanContextOnIndicesLifecycleListener();
private final NonBlockingHashMapLong<SearchContext> activeContexts = new NonBlockingHashMapLong<SearchContext>();
private final ImmutableMap<String, SearchParseElement> elementParsers;
@ -105,13 +108,15 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
this.queryPhase = queryPhase;
this.fetchPhase = fetchPhase;
this.defaultKeepAlive = componentSettings.getAsTime("default_keep_alive", timeValueMinutes(2));
// we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes
this.defaultKeepAlive = componentSettings.getAsTime("default_keep_alive", timeValueMinutes(5));
Map<String, SearchParseElement> elementParsers = new HashMap<String, SearchParseElement>();
elementParsers.putAll(dfsPhase.parseElements());
elementParsers.putAll(queryPhase.parseElements());
elementParsers.putAll(fetchPhase.parseElements());
this.elementParsers = ImmutableMap.copyOf(elementParsers);
indicesService.indicesLifecycle().addListener(indicesLifecycleListener);
}
@Override protected void doStart() throws ElasticSearchException {
@ -125,6 +130,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
@Override protected void doClose() throws ElasticSearchException {
indicesService.indicesLifecycle().removeListener(indicesLifecycleListener);
}
public void releaseContextsForIndex(Index index) {
@ -390,7 +396,18 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
}
private class KeepAliveTimerTask implements TimerTask {
class CleanContextOnIndicesLifecycleListener extends IndicesLifecycle.Listener {
@Override public void beforeIndexClosed(IndexService indexService, boolean delete) {
releaseContextsForIndex(indexService.index());
}
@Override public void beforeIndexShardClosed(IndexShard indexShard, boolean delete) {
releaseContextsForShard(indexShard.shardId());
}
}
class KeepAliveTimerTask implements TimerTask {
private final SearchContext context;