apply review comments

This commit is contained in:
Simon Willnauer 2015-10-21 09:34:25 +02:00
parent b772b513e0
commit cba210c439
2 changed files with 21 additions and 16 deletions

View File

@ -166,7 +166,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
private final MeanMetric refreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();
private final ShardEngineFailListener engineEventListener = new ShardEngineFailListener();
private final ShardEventListener shardEventListener = new ShardEventListener();
private volatile boolean flushOnClose = true;
private volatile int flushThresholdOperations;
private volatile ByteSizeValue flushThresholdSize;
@ -979,7 +979,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
public void addShardFailureCallback(Callback<ShardFailure> onShardFailure) {
this.engineEventListener.delegates.add(onShardFailure);
this.shardEventListener.delegates.add(onShardFailure);
}
/** Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than
@ -1368,13 +1368,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
return this.currentEngineReference.get();
}
class ShardEngineFailListener implements Engine.EventListener {
class ShardEventListener implements Engine.EventListener {
private final CopyOnWriteArrayList<Callback<ShardFailure>> delegates = new CopyOnWriteArrayList<>();
// called by the current engine
@Override
public void onFailedEngine(String reason, @Nullable Throwable failure) {
final ShardFailure shardFailure = new ShardFailure(shardRouting, reason, failure);
final ShardFailure shardFailure = new ShardFailure(shardRouting, reason, failure, getIndexUUID());
for (Callback<ShardFailure> listener : delegates) {
try {
listener.handle(shardFailure);
@ -1457,7 +1457,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
};
return new EngineConfig(shardId,
threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, engineEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
}
private static class IndexShardOperationCounter extends AbstractRefCounted {
@ -1578,12 +1578,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
public static final class ShardFailure {
public final ShardRouting routing;
public final String reason;
@Nullable
public final Throwable cause;
public final String indexUUID;
public ShardFailure(ShardRouting routing, String reason, Throwable cause) {
public ShardFailure(ShardRouting routing, String reason, @Nullable Throwable cause, String indexUUID) {
this.routing = routing;
this.reason = reason;
this.cause = cause;
this.indexUUID = indexUUID;
}
}

View File

@ -98,7 +98,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private final Object mutex = new Object();
private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler();
private final FailedShardHandler failedShardHandler = new FailedShardHandler();
private final boolean sendRefreshMapping;
@ -381,7 +381,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// so this failure typically means wrong node level configuration or something similar
for (IndexShard indexShard : indexService) {
ShardRouting shardRouting = indexShard.routingEntry();
failAndRemoveShard(shardRouting, indexService, true, "failed to update mappings", t);
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed to update mappings", t);
}
}
}
@ -637,11 +637,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
IndexShard indexShard = indexService.createShard(shardId, shardRouting);
indexShard.updateRoutingEntry(shardRouting, state.blocks().disableStatePersistence() == false);
indexShard.addShardFailureCallback(failedEngineHandler);
indexShard.addShardFailureCallback(failedShardHandler);
} catch (IndexShardAlreadyExistsException e) {
// ignore this, the method call can happen several times
} catch (Throwable e) {
failAndRemoveShard(shardRouting, indexService, true, "failed to create shard", e);
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed to create shard", e);
return;
}
}
@ -768,7 +768,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private void handleRecoveryFailure(IndexService indexService, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
synchronized (mutex) {
failAndRemoveShard(shardRouting, indexService, sendShardFailure, "failed recovery", failure);
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, sendShardFailure, "failed recovery", failure);
}
}
@ -802,8 +802,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private void failAndRemoveShard(ShardRouting shardRouting, IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
if (indexService.hasShard(shardRouting.getId())) {
private void failAndRemoveShard(ShardRouting shardRouting, String indexUUID, @Nullable IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
if (indexService != null && indexService.hasShard(shardRouting.getId())) {
// if the indexService is null we can't remove the shard, that's fine since we might have a failure
// when the index is remove and then we already removed the index service for that shard...
try {
indexService.removeShard(shardRouting.getId(), message);
} catch (ShardNotFoundException e) {
@ -813,7 +815,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
if (sendShardFailure) {
sendFailShard(shardRouting, indexService.indexUUID(), message, failure);
sendFailShard(shardRouting, indexUUID, message, failure);
}
}
@ -827,14 +829,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
private class FailedEngineHandler implements Callback<IndexShard.ShardFailure> {
private class FailedShardHandler implements Callback<IndexShard.ShardFailure> {
@Override
public void handle(final IndexShard.ShardFailure shardFailure) {
final IndexService indexService = indicesService.indexService(shardFailure.routing.shardId().index().name());
final ShardRouting shardRouting = shardFailure.routing;
threadPool.generic().execute(() -> {
synchronized (mutex) {
failAndRemoveShard(shardRouting, indexService, true, "engine failure, reason [" + shardFailure.reason + "]", shardFailure.cause);
failAndRemoveShard(shardRouting, shardFailure.indexUUID, indexService, true, "shard failure, reason [" + shardFailure.reason + "]", shardFailure.cause);
}
});
}