better cleanup on shard startup failure

This commit is contained in:
kimchy 2010-07-08 19:20:03 +03:00
parent b69fc265c5
commit 4429a61528
4 changed files with 43 additions and 16 deletions

View File

@ -64,6 +64,7 @@ import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ShardsPluginsModule;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -277,29 +278,51 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
IndexShard indexShard = tmpShardsMap.remove(shardId);
shards = ImmutableMap.copyOf(tmpShardsMap);
indicesLifecycle.beforeIndexShardClosed(indexShard, delete);
ShardId sId = new ShardId(index, shardId);
indicesLifecycle.beforeIndexShardClosed(sId, indexShard, delete);
for (Class<? extends CloseableIndexComponent> closeable : pluginsService.shardServices()) {
shardInjector.getInstance(closeable).close(delete);
try {
shardInjector.getInstance(closeable).close(delete);
} catch (Exception e) {
logger.debug("failed to clean plugin shard service [{}]", e, closeable);
}
}
// close shard actions
shardInjector.getInstance(IndexShardManagement.class).close();
if (indexShard != null) {
shardInjector.getInstance(IndexShardManagement.class).close();
}
RecoveryAction recoveryAction = shardInjector.getInstance(RecoveryAction.class);
if (recoveryAction != null) recoveryAction.close();
// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
indexShard.close();
shardInjector.getInstance(Engine.class).close();
// now, we can snapshot to the gateway, it will be only the translog
shardInjector.getInstance(IndexShardGatewayService.class).close(deleteGateway);
// now we can close the translog
shardInjector.getInstance(Translog.class).close();
if (indexShard != null) {
indexShard.close();
}
try {
shardInjector.getInstance(Engine.class).close();
} catch (Exception e) {
// ignore
}
try {
// now, we can snapshot to the gateway, it will be only the translog
shardInjector.getInstance(IndexShardGatewayService.class).close(deleteGateway);
} catch (Exception e) {
// ignore
}
try {
// now we can close the translog
shardInjector.getInstance(Translog.class).close();
} catch (Exception e) {
// ignore
}
// call this before we close the store, so we can release resources for it
indicesLifecycle.afterIndexShardClosed(indexShard.shardId(), delete);
indicesLifecycle.afterIndexShardClosed(sId, delete);
// if we delete or have no gateway or the store is not persistent, clean the store...
Store store = shardInjector.getInstance(Store.class);
@ -322,7 +345,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
class CleanCacheOnIndicesLifecycleListener extends IndicesLifecycle.Listener {
@Override public void beforeIndexShardClosed(IndexShard indexShard, boolean delete) {
@Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) {
indexCache.clearUnreferenced();
}

View File

@ -24,6 +24,8 @@ import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import javax.annotation.Nullable;
/**
* A global component allowing to register for lifecycle of an index (create/closed) and
* an index shard (created/closed).
@ -101,7 +103,7 @@ public interface IndicesLifecycle {
* @param indexShard The index shard
* @param delete Does the index shard gets closed because of a delete command, or because the node is shutting down
*/
public void beforeIndexShardClosed(IndexShard indexShard, boolean delete) {
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) {
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import javax.annotation.Nullable;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@ -84,9 +85,9 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
}
}
public void beforeIndexShardClosed(IndexShard indexShard, boolean delete) {
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) {
for (Listener listener : listeners) {
listener.beforeIndexShardClosed(indexShard, delete);
listener.beforeIndexShardClosed(shardId, indexShard, delete);
}
}

View File

@ -58,6 +58,7 @@ import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.timer.TimerService;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -399,8 +400,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
releaseContextsForIndex(indexService.index());
}
@Override public void beforeIndexShardClosed(IndexShard indexShard, boolean delete) {
releaseContextsForShard(indexShard.shardId());
@Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) {
releaseContextsForShard(shardId);
}
}