mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
improve dangling index support to not detect explicit deleted index as dangling, harden when we delete the _state of an index
This commit is contained in:
parent
b2c4876626
commit
1668533556
@ -21,8 +21,10 @@ package org.elasticsearch.gateway.blobstore;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.blobstore.*;
|
||||
@ -140,6 +142,11 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
|
||||
return commitPoints.commits().get(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void delete(IndexMetaData indexMetaData) throws ElasticSearchException {
|
||||
BlobPath indexPath = basePath().add("indices").add(indexMetaData.index());
|
||||
blobStore.delete(indexPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(MetaData metaData) throws GatewayException {
|
||||
|
@ -98,6 +98,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
|
||||
|
||||
private final LocalAllocateDangledIndices allocateDangledIndices;
|
||||
|
||||
@Nullable
|
||||
private volatile MetaData currentMetaData;
|
||||
|
||||
private final XContentType format;
|
||||
@ -145,8 +146,8 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
|
||||
}
|
||||
}
|
||||
|
||||
public MetaData currentMetaData() {
|
||||
return currentMetaData;
|
||||
public MetaData loadMetaState() throws Exception {
|
||||
return loadState();
|
||||
}
|
||||
|
||||
public boolean isDangling(String index) {
|
||||
@ -156,27 +157,36 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.state().blocks().disableStatePersistence()) {
|
||||
// reset the current metadata, we need to start fresh...
|
||||
this.currentMetaData = null;
|
||||
return;
|
||||
}
|
||||
|
||||
MetaData newMetaData = event.state().metaData();
|
||||
// we don't check if metaData changed, since we might be called several times and we need to check dangling...
|
||||
|
||||
boolean success = true;
|
||||
// only applied to master node, writing the global and index level states
|
||||
if (event.state().nodes().localNode().masterNode()) {
|
||||
// check if the global state changed?
|
||||
if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, event.state().metaData())) {
|
||||
if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, newMetaData)) {
|
||||
try {
|
||||
writeGlobalState("changed", event.state().metaData(), currentMetaData);
|
||||
writeGlobalState("changed", newMetaData, currentMetaData);
|
||||
} catch (Exception e) {
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
|
||||
// check and write changes in indices
|
||||
for (IndexMetaData indexMetaData : event.state().metaData()) {
|
||||
for (IndexMetaData indexMetaData : newMetaData) {
|
||||
String writeReason = null;
|
||||
IndexMetaData currentIndexMetaData = currentMetaData == null ? null : currentMetaData.index(indexMetaData.index());
|
||||
IndexMetaData currentIndexMetaData;
|
||||
if (currentMetaData == null) {
|
||||
// a new event..., check from the state stored
|
||||
currentIndexMetaData = loadIndex(indexMetaData.index());
|
||||
} else {
|
||||
currentIndexMetaData = currentMetaData.index(indexMetaData.index());
|
||||
}
|
||||
if (currentIndexMetaData == null) {
|
||||
writeReason = "freshly created";
|
||||
} else if (currentIndexMetaData.version() != indexMetaData.version()) {
|
||||
@ -196,12 +206,31 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
|
||||
}
|
||||
}
|
||||
|
||||
// delete indices that were there before, but are deleted now
|
||||
// we need to do it so they won't be detected as dangling
|
||||
if (nodeEnv.hasNodeFile()) {
|
||||
if (currentMetaData != null) {
|
||||
// only delete indices when we already received a state (currentMetaData != null)
|
||||
// and we had a go at processing dangling indices at least once
|
||||
// this will also delete the _state of the index itself
|
||||
for (IndexMetaData current : currentMetaData) {
|
||||
if (danglingIndices.containsKey(current.index())) {
|
||||
continue;
|
||||
}
|
||||
if (!newMetaData.hasIndex(current.index())) {
|
||||
logger.debug("[{}] deleting index that is no longer part of the metadata");
|
||||
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(current.index())));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handle dangling indices, we handle those for all nodes that have a node file (data or master)
|
||||
if (nodeEnv.hasNodeFile()) {
|
||||
if (danglingTimeout.millis() >= 0) {
|
||||
synchronized (danglingMutex) {
|
||||
for (String danglingIndex : danglingIndices.keySet()) {
|
||||
if (event.state().metaData().hasIndex(danglingIndex)) {
|
||||
if (newMetaData.hasIndex(danglingIndex)) {
|
||||
logger.debug("[{}] no longer dangling (created), removing", danglingIndex);
|
||||
DanglingIndex removed = danglingIndices.remove(danglingIndex);
|
||||
removed.future.cancel(false);
|
||||
@ -211,19 +240,22 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
|
||||
try {
|
||||
for (String indexName : nodeEnv.findAllIndices()) {
|
||||
// if we have the index on the metadata, don't delete it
|
||||
if (event.state().metaData().hasIndex(indexName)) {
|
||||
if (newMetaData.hasIndex(indexName)) {
|
||||
continue;
|
||||
}
|
||||
if (danglingIndices.containsKey(indexName)) {
|
||||
// already dangling, continue
|
||||
continue;
|
||||
}
|
||||
if (danglingTimeout.millis() == 0) {
|
||||
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
|
||||
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(indexName)));
|
||||
} else {
|
||||
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled);
|
||||
danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName))));
|
||||
IndexMetaData indexMetaData = loadIndex(indexName);
|
||||
if (indexMetaData != null) {
|
||||
if (danglingTimeout.millis() == 0) {
|
||||
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
|
||||
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(indexName)));
|
||||
} else {
|
||||
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled);
|
||||
danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName))));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@ -235,6 +267,10 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
|
||||
final List<IndexMetaData> dangled = Lists.newArrayList();
|
||||
for (String indexName : danglingIndices.keySet()) {
|
||||
IndexMetaData indexMetaData = loadIndex(indexName);
|
||||
if (indexMetaData == null) {
|
||||
logger.debug("failed to find state for dangling index [{}]", indexName);
|
||||
continue;
|
||||
}
|
||||
// we might have someone copying over an index, renaming the directory, handle that
|
||||
if (!indexMetaData.index().equals(indexName)) {
|
||||
logger.info("dangled index directory name is [{}], state name is [{}], renaming to directory name", indexName, indexMetaData.index());
|
||||
@ -266,21 +302,8 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
|
||||
}
|
||||
}
|
||||
|
||||
if (event.state().nodes().localNode().masterNode()) {
|
||||
// delete indices that are no longer there..., allocated dangled ones
|
||||
if (currentMetaData != null) {
|
||||
for (IndexMetaData current : currentMetaData) {
|
||||
if (event.state().metaData().index(current.index()) == null) {
|
||||
if (!danglingIndices.containsKey(current.index())) {
|
||||
deleteIndex(current.index());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (success) {
|
||||
currentMetaData = event.state().metaData();
|
||||
currentMetaData = newMetaData;
|
||||
}
|
||||
}
|
||||
|
||||
@ -383,7 +406,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
|
||||
}
|
||||
|
||||
// delete the old files
|
||||
if (previousMetaData != null && previousMetaData.version() != currentMetaData.version()) {
|
||||
if (previousMetaData != null && previousMetaData.version() != metaData.version()) {
|
||||
for (File dataLocation : nodeEnv.nodeDataLocations()) {
|
||||
File stateFile = new File(new File(dataLocation, "_state"), "global-" + previousMetaData.version());
|
||||
stateFile.delete();
|
||||
@ -394,7 +417,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
|
||||
}
|
||||
}
|
||||
|
||||
private void loadState() throws Exception {
|
||||
private MetaData loadState() throws Exception {
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder();
|
||||
MetaData globalMetaData = loadGlobalState();
|
||||
if (globalMetaData != null) {
|
||||
@ -410,9 +433,10 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
|
||||
metaDataBuilder.put(indexMetaData, false);
|
||||
}
|
||||
}
|
||||
currentMetaData = metaDataBuilder.build();
|
||||
return metaDataBuilder.build();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private IndexMetaData loadIndex(String index) {
|
||||
long highestVersion = -1;
|
||||
IndexMetaData indexMetaData = null;
|
||||
|
@ -116,7 +116,11 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA
|
||||
|
||||
@Override
|
||||
protected NodeLocalGatewayMetaState nodeOperation(NodeRequest request) throws ElasticSearchException {
|
||||
return new NodeLocalGatewayMetaState(clusterService.localNode(), metaState.currentMetaData());
|
||||
try {
|
||||
return new NodeLocalGatewayMetaState(clusterService.localNode(), metaState.loadMetaState());
|
||||
} catch (Exception e) {
|
||||
throw new ElasticSearchException("failed to load metadata", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -25,12 +25,17 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.gateway.Gateway;
|
||||
import org.elasticsearch.gateway.GatewayException;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
@ -50,12 +55,21 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
|
||||
|
||||
private ExecutorService writeStateExecutor;
|
||||
|
||||
private volatile MetaData currentMetaData;
|
||||
|
||||
private NodeEnvironment nodeEnv;
|
||||
|
||||
public SharedStorageGateway(Settings settings, ThreadPool threadPool, ClusterService clusterService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.writeStateExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway#writeMetaData"));
|
||||
clusterService.add(this);
|
||||
clusterService.addLast(this);
|
||||
}
|
||||
|
||||
@Inject
|
||||
public void setNodeEnv(NodeEnvironment nodeEnv) {
|
||||
this.nodeEnv = nodeEnv;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -110,16 +124,17 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
|
||||
|
||||
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
|
||||
if (event.state().blocks().disableStatePersistence()) {
|
||||
this.currentMetaData = null;
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.localNodeMaster()) {
|
||||
if (!event.metaDataChanged()) {
|
||||
return;
|
||||
}
|
||||
writeStateExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (!event.metaDataChanged()) {
|
||||
return;
|
||||
}
|
||||
writeStateExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (event.localNodeMaster()) {
|
||||
logger.debug("writing to gateway {} ...", this);
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
try {
|
||||
@ -129,12 +144,31 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
|
||||
} catch (Exception e) {
|
||||
logger.error("failed to write to gateway", e);
|
||||
}
|
||||
if (currentMetaData != null) {
|
||||
for (IndexMetaData current : currentMetaData) {
|
||||
if (!event.state().metaData().hasIndex(current.index())) {
|
||||
delete(current);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
if (nodeEnv != null && nodeEnv.hasNodeFile()) {
|
||||
if (currentMetaData != null) {
|
||||
for (IndexMetaData current : currentMetaData) {
|
||||
if (!event.state().metaData().hasIndex(current.index())) {
|
||||
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(current.index())));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
currentMetaData = event.state().metaData();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected abstract MetaData read() throws ElasticSearchException;
|
||||
|
||||
protected abstract void write(MetaData metaData) throws ElasticSearchException;
|
||||
|
||||
protected abstract void delete(IndexMetaData indexMetaData) throws ElasticSearchException;
|
||||
}
|
||||
|
@ -229,12 +229,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||
for (final String index : indicesService.indices()) {
|
||||
if (!event.state().metaData().hasIndex(index)) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[{}] deleting index", index);
|
||||
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
|
||||
}
|
||||
try {
|
||||
indicesService.deleteIndex(index, "deleting index");
|
||||
indicesService.cleanIndex(index, "index no longer part of the metadata");
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to delete index", e);
|
||||
logger.warn("failed to clean index", e);
|
||||
}
|
||||
// clear seen mappings as well
|
||||
for (Tuple<String, String> tuple : seenMappings.keySet()) {
|
||||
|
@ -163,8 +163,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
||||
}
|
||||
}
|
||||
|
||||
// do the reverse, and delete dangling indices / shards that might remain on that node
|
||||
// this can happen when deleting a closed index, or when a node joins and it has deleted indices / shards
|
||||
// do the reverse, and delete dangling shards that might remain on that node
|
||||
// but are allocated on other nodes
|
||||
if (nodeEnv.hasNodeFile()) {
|
||||
// delete unused shards for existing indices
|
||||
for (IndexRoutingTable indexRoutingTable : routingTable) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user