Use provided cluster state for indices service validations
Since the method can be called in an #execute event of the cluster service, we need the ability to use the cluster state that will be provided in the ClusterChangedEvent, have the ClusterState be provided as a parameter
This commit is contained in:
parent
0a3175a16f
commit
48bdd58d51
|
@ -27,6 +27,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
|||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -136,11 +137,12 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
|||
private final Map<String, DanglingIndex> danglingIndices = ConcurrentCollections.newConcurrentMap();
|
||||
private final Object danglingMutex = new Object();
|
||||
private final IndicesService indicesService;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
@Inject
|
||||
public GatewayMetaState(Settings settings, ThreadPool threadPool, NodeEnvironment nodeEnv,
|
||||
TransportNodesListGatewayMetaState nodesListGatewayMetaState, LocalAllocateDangledIndices allocateDangledIndices,
|
||||
IndicesService indicesService) throws Exception {
|
||||
IndicesService indicesService, ClusterService clusterService) throws Exception {
|
||||
super(settings);
|
||||
this.nodeEnv = nodeEnv;
|
||||
this.threadPool = threadPool;
|
||||
|
@ -187,6 +189,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
|||
}
|
||||
}
|
||||
this.indicesService = indicesService;
|
||||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
public MetaData loadMetaState() throws Exception {
|
||||
|
@ -279,7 +282,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
|||
danglingIndices.put(indexName, new DanglingIndex(indexName, null));
|
||||
} else if (danglingTimeout.millis() == 0) {
|
||||
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
|
||||
indicesService.deleteIndexStore("dangling index with timeout set to 0", indexMetaData);
|
||||
indicesService.deleteIndexStore("dangling index with timeout set to 0", indexMetaData, state);
|
||||
} else {
|
||||
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled);
|
||||
danglingIndices.put(indexName,
|
||||
|
@ -530,7 +533,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
|||
}
|
||||
logger.warn("[{}] deleting dangling index", metaData.index());
|
||||
try {
|
||||
indicesService.deleteIndexStore("deleting dangling index", metaData);
|
||||
indicesService.deleteIndexStore("deleting dangling index", metaData, clusterService.state());
|
||||
} catch (Exception ex) {
|
||||
logger.debug("failed to delete dangling index", ex);
|
||||
}
|
||||
|
|
|
@ -118,7 +118,6 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
|
||||
private final PluginsService pluginsService;
|
||||
private final NodeEnvironment nodeEnv;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private volatile Map<String, Tuple<IndexService, Injector>> indices = ImmutableMap.of();
|
||||
private final Map<Index, List<PendingDelete>> pendingDeletes = new HashMap<>();
|
||||
|
@ -126,10 +125,9 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
private final OldShardsStats oldShardsStats = new OldShardsStats();
|
||||
|
||||
@Inject
|
||||
public IndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, Injector injector, NodeEnvironment nodeEnv, ClusterService clusterService) {
|
||||
public IndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, Injector injector, NodeEnvironment nodeEnv) {
|
||||
super(settings);
|
||||
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
|
||||
this.clusterService = clusterService;
|
||||
this.indicesAnalysisService = indicesAnalysisService;
|
||||
this.injector = injector;
|
||||
this.pluginsService = injector.getInstance(PluginsService.class);
|
||||
|
@ -447,16 +445,15 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
removeIndex(index, reason, true);
|
||||
}
|
||||
|
||||
public void deleteClosedIndex(String reason, IndexMetaData metaData) {
|
||||
public void deleteClosedIndex(String reason, IndexMetaData metaData, ClusterState clusterState) {
|
||||
if (nodeEnv.hasNodeFile()) {
|
||||
String indexName = metaData.getIndex();
|
||||
try {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
if (clusterState.metaData().hasIndex(indexName)) {
|
||||
final IndexMetaData index = clusterState.metaData().index(indexName);
|
||||
throw new ElasticsearchIllegalStateException("Can't delete closed index store for [" + indexName + "] - it's still part of the cluster state [" + index.getUUID() + "] [" + metaData.getUUID() + "]");
|
||||
}
|
||||
deleteIndexStore(reason, metaData);
|
||||
deleteIndexStore(reason, metaData, clusterState);
|
||||
} catch (IOException e) {
|
||||
logger.warn("[{}] failed to delete closed index", e, metaData.index());
|
||||
}
|
||||
|
@ -467,7 +464,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
* Deletes the index store trying to acquire all shards locks for this index.
|
||||
* This method will delete the metadata for the index even if the actual shards can't be locked.
|
||||
*/
|
||||
public void deleteIndexStore(String reason, IndexMetaData metaData) throws IOException {
|
||||
public void deleteIndexStore(String reason, IndexMetaData metaData, ClusterState clusterState) throws IOException {
|
||||
if (nodeEnv.hasNodeFile()) {
|
||||
synchronized (this) {
|
||||
String indexName = metaData.index();
|
||||
|
@ -475,7 +472,6 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
String localUUid = indices.get(indexName).v1().indexUUID();
|
||||
throw new ElasticsearchIllegalStateException("Can't delete index store for [" + indexName + "] - it's still part of the indices service [" + localUUid+ "] [" + metaData.getUUID() + "]");
|
||||
}
|
||||
ClusterState clusterState = clusterService.state();
|
||||
if (clusterState.metaData().hasIndex(indexName) && (clusterState.nodes().localNode().masterNode() == true)) {
|
||||
// we do not delete the store if it is a master eligible node and the index is still in the cluster state
|
||||
// because we want to keep the meta data for indices around even if no shards are left here
|
||||
|
|
|
@ -244,7 +244,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
final IndexMetaData metaData = previousState.metaData().index(index);
|
||||
assert metaData != null;
|
||||
indexSettings = metaData.settings();
|
||||
indicesService.deleteClosedIndex("closed index no longer part of the metadata", metaData);
|
||||
indicesService.deleteClosedIndex("closed index no longer part of the metadata", metaData, event.state());
|
||||
}
|
||||
try {
|
||||
nodeIndexDeletedAction.nodeIndexDeleted(event.state(), index, indexSettings, localNodeId);
|
||||
|
|
|
@ -303,7 +303,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
|||
// nodes keep the index metadata around
|
||||
if (indicesService.hasIndex(shardId.getIndex()) == false && currentState.nodes().localNode().masterNode() == false) {
|
||||
try {
|
||||
indicesService.deleteIndexStore("no longer used", indexMeta);
|
||||
indicesService.deleteIndexStore("no longer used", indexMeta, currentState);
|
||||
} catch (Throwable ex) {
|
||||
logger.debug("{} failed to delete unallocated index, ignoring", ex, shardId.getIndex());
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest {
|
|||
assertTrue(test.hasShard(0));
|
||||
|
||||
try {
|
||||
indicesService.deleteIndexStore("boom", firstMetaData);
|
||||
indicesService.deleteIndexStore("boom", firstMetaData, clusterService.state());
|
||||
fail();
|
||||
} catch (ElasticsearchIllegalStateException ex) {
|
||||
// all good
|
||||
|
@ -100,7 +100,7 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest {
|
|||
}
|
||||
|
||||
try {
|
||||
indicesService.deleteIndexStore("boom", secondMetaData);
|
||||
indicesService.deleteIndexStore("boom", secondMetaData, clusterService.state());
|
||||
fail();
|
||||
} catch (ElasticsearchIllegalStateException ex) {
|
||||
// all good
|
||||
|
@ -112,7 +112,7 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest {
|
|||
|
||||
// now delete the old one and make sure we resolve against the name
|
||||
try {
|
||||
indicesService.deleteIndexStore("boom", firstMetaData);
|
||||
indicesService.deleteIndexStore("boom", firstMetaData, clusterService.state());
|
||||
fail();
|
||||
} catch (ElasticsearchIllegalStateException ex) {
|
||||
// all good
|
||||
|
|
Loading…
Reference in New Issue