sync changes done on the indices cluster service with changes happening in an async manner during recovery

This commit is contained in:
kimchy 2010-11-15 16:54:29 +02:00
parent 4b06eeb75a
commit effdd52586
1 changed files with 34 additions and 26 deletions

View File

@ -86,6 +86,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// a map of mappings type we have seen per index
private final ConcurrentMap<Tuple<String, String>, Boolean> seenMappings = ConcurrentCollections.newConcurrentMap();
private final Object mutex = new Object();
@Inject public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
ThreadPool threadPool, RecoveryTarget recoveryTarget, ShardStateAction shardStateAction,
NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
@ -116,12 +118,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (!indicesService.changesAllowed())
return;
applyNewIndices(event);
applyMappings(event);
applyNewOrUpdatedShards(event);
applyDeletedIndices(event);
applyDeletedShards(event);
applyCleanedIndices(event);
synchronized (mutex) {
applyNewIndices(event);
applyMappings(event);
applyNewOrUpdatedShards(event);
applyDeletedIndices(event);
applyDeletedShards(event);
applyCleanedIndices(event);
}
}
private void applyCleanedIndices(final ClusterChangedEvent event) {
@ -426,13 +430,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (!removeShard) {
return;
}
if (indexService.hasShard(shardRouting.shardId().id())) {
try {
indexService.removeShard(shardRouting.shardId().id());
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Exception e1) {
logger.warn("[{}][{}] failed to delete shard after ignore recovery", e1, indexService.index().name(), shardRouting.shardId().id());
synchronized (mutex) {
if (indexService.hasShard(shardRouting.shardId().id())) {
try {
indexService.removeShard(shardRouting.shardId().id());
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Exception e1) {
logger.warn("[{}][{}] failed to delete shard after ignore recovery", e1, indexService.index().name(), shardRouting.shardId().id());
}
}
}
}
@ -444,20 +450,22 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private void handleRecoveryFailure(IndexService indexService, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
logger.warn("[{}][{}] failed to start shard", failure, indexService.index().name(), shardRouting.shardId().id());
if (indexService.hasShard(shardRouting.shardId().id())) {
try {
indexService.cleanShard(shardRouting.shardId().id());
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Exception e1) {
logger.warn("[{}][{}] failed to delete shard after failed startup", e1, indexService.index().name(), shardRouting.shardId().id());
synchronized (mutex) {
if (indexService.hasShard(shardRouting.shardId().id())) {
try {
indexService.cleanShard(shardRouting.shardId().id());
} catch (IndexShardMissingException e) {
// the node got closed on us, ignore it
} catch (Exception e1) {
logger.warn("[{}][{}] failed to delete shard after failed startup", e1, indexService.index().name(), shardRouting.shardId().id());
}
}
}
if (sendShardFailure) {
try {
shardStateAction.shardFailed(shardRouting, "Failed to start shard, message [" + detailedMessage(failure) + "]");
} catch (Exception e1) {
logger.warn("[{}][{}] failed to mark shard as failed after a failed start", e1, indexService.index().name(), shardRouting.id());
if (sendShardFailure) {
try {
shardStateAction.shardFailed(shardRouting, "Failed to start shard, message [" + detailedMessage(failure) + "]");
} catch (Exception e1) {
logger.warn("[{}][{}] failed to mark shard as failed after a failed start", e1, indexService.index().name(), shardRouting.id());
}
}
}
}