dangling index handling might still remove the state files for the dangling index, closes #2065.

This commit is contained in:
Shay Banon 2012-06-28 13:32:44 +02:00
parent 7454c7c192
commit a872c88f03
6 changed files with 228 additions and 88 deletions

View File

@ -204,6 +204,8 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
if (event.state().blocks().disableStatePersistence()) {
return;
}
// order is important, first metaState, and then shardsState
// so dangling indices will be recorded
metaState.clusterChanged(event);
shardsState.clusterChanged(event);
}

View File

@ -36,9 +36,11 @@ import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.File;
import java.io.FileInputStream;
@ -46,6 +48,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
/**
*
@ -53,16 +56,23 @@ import java.util.Set;
public class LocalGatewayMetaState extends AbstractComponent implements ClusterStateListener {
private final NodeEnvironment nodeEnv;
private final ThreadPool threadPool;
private volatile MetaData currentMetaData;
private final XContentType format;
private final ToXContent.Params formatParams;
private final TimeValue danglingTimeout;
private final Map<String, DanglingIndex> danglingIndices = ConcurrentCollections.newConcurrentMap();
private final Object danglingMutex = new Object();
@Inject
public LocalGatewayMetaState(Settings settings, NodeEnvironment nodeEnv, TransportNodesListGatewayMetaState nodesListGatewayMetaState) throws Exception {
public LocalGatewayMetaState(Settings settings, ThreadPool threadPool, NodeEnvironment nodeEnv, TransportNodesListGatewayMetaState nodesListGatewayMetaState) throws Exception {
super(settings);
this.nodeEnv = nodeEnv;
this.threadPool = threadPool;
this.format = XContentType.fromRestContentType(settings.get("format", "smile"));
nodesListGatewayMetaState.init(this);
@ -74,6 +84,8 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
formatParams = ToXContent.EMPTY_PARAMS;
}
this.danglingTimeout = settings.getAsTime("gateway.local.dangling_timeout", TimeValue.timeValueHours(2));
if (DiscoveryNode.masterNode(settings)) {
try {
pre019Upgrade();
@ -91,6 +103,10 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
return currentMetaData;
}
public boolean isDangling(String index) {
return danglingIndices.containsKey(index);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().disableStatePersistence()) {
@ -137,11 +153,50 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
}
}
// handle dangling indices
if (nodeEnv.hasNodeFile()) {
if (danglingTimeout.millis() >= 0) {
synchronized (danglingMutex) {
for (String danglingIndex : danglingIndices.keySet()) {
if (event.state().metaData().hasIndex(danglingIndex)) {
logger.debug("[{}] no longer dangling (created), removing", danglingIndex);
DanglingIndex removed = danglingIndices.remove(danglingIndex);
removed.future.cancel(false);
}
}
// delete indices that are no longer part of the metadata
try {
for (String indexName : nodeEnv.findAllIndices()) {
// if we have the index on the metadata, don't delete it
if (event.state().metaData().hasIndex(indexName)) {
continue;
}
if (danglingIndices.containsKey(indexName)) {
// already dangling, continue
continue;
}
if (danglingTimeout.millis() == 0) {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(indexName)));
} else {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}]", indexName, danglingTimeout);
danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName))));
}
}
} catch (Exception e) {
logger.warn("failed to find dangling indices", e);
}
}
}
}
// delete indices that are no longer there...
if (currentMetaData != null) {
for (IndexMetaData current : currentMetaData) {
if (event.state().metaData().index(current.index()) == null) {
deleteIndex(current.index());
if (!danglingIndices.containsKey(current.index())) {
deleteIndex(current.index());
}
}
}
}
@ -472,4 +527,36 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
logger.info("conversion to new metadata location and format done, backup create at [{}]", backupFile.getAbsolutePath());
}
class RemoveDanglingIndex implements Runnable {
private final String index;
RemoveDanglingIndex(String index) {
this.index = index;
}
@Override
public void run() {
synchronized (danglingMutex) {
DanglingIndex remove = danglingIndices.remove(index);
// no longer there...
if (remove == null) {
return;
}
logger.info("[{}] deleting dangling index", index);
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(index)));
}
}
}
static class DanglingIndex {
public final String index;
public final ScheduledFuture future;
DanglingIndex(String index, ScheduledFuture future) {
this.index = index;
this.future = future;
}
}
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.local.state.meta.LocalGatewayMetaState;
import org.elasticsearch.index.shard.ShardId;
import java.io.File;
@ -50,13 +51,15 @@ import java.util.Set;
public class LocalGatewayShardsState extends AbstractComponent implements ClusterStateListener {
private final NodeEnvironment nodeEnv;
private final LocalGatewayMetaState metaState;
private volatile Map<ShardId, ShardStateInfo> currentState = Maps.newHashMap();
@Inject
public LocalGatewayShardsState(Settings settings, NodeEnvironment nodeEnv, TransportNodesListGatewayStartedShards listGatewayStartedShards) throws Exception {
public LocalGatewayShardsState(Settings settings, NodeEnvironment nodeEnv, TransportNodesListGatewayStartedShards listGatewayStartedShards, LocalGatewayMetaState metaState) throws Exception {
super(settings);
this.nodeEnv = nodeEnv;
this.metaState = metaState;
listGatewayStartedShards.initGateway(this);
if (DiscoveryNode.dataNode(settings)) {
@ -154,7 +157,9 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
for (Map.Entry<ShardId, ShardStateInfo> entry : currentState.entrySet()) {
ShardId shardId = entry.getKey();
if (!newState.containsKey(shardId)) {
deleteShardState(shardId);
if (!metaState.isDangling(shardId.index().name())) {
deleteShardState(shardId);
}
}
}

View File

@ -21,8 +21,11 @@ package org.elasticsearch.index.gateway.local;
import com.google.common.io.Closeables;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.gateway.IndexShardGateway;
@ -97,12 +100,21 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
long translogId = -1;
try {
if (IndexReader.indexExists(indexShard.store().directory())) {
version = IndexReader.getCurrentVersion(indexShard.store().directory());
Map<String, String> commitUserData = IndexReader.getCommitUserData(indexShard.store().directory());
if (commitUserData.containsKey(Translog.TRANSLOG_ID_KEY)) {
translogId = Long.parseLong(commitUserData.get(Translog.TRANSLOG_ID_KEY));
if (indexShouldExists) {
version = IndexReader.getCurrentVersion(indexShard.store().directory());
Map<String, String> commitUserData = IndexReader.getCommitUserData(indexShard.store().directory());
if (commitUserData.containsKey(Translog.TRANSLOG_ID_KEY)) {
translogId = Long.parseLong(commitUserData.get(Translog.TRANSLOG_ID_KEY));
} else {
translogId = version;
}
logger.trace("using existing shard data, translog id [{}]", translogId);
} else {
translogId = version;
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
// its a "new index create" API, we have to do something, so better to clean it than use same data
logger.trace("cleaning existing shard, shouldn't exists");
IndexWriter writer = new IndexWriter(indexShard.store().directory(), new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
writer.close();
}
} else if (indexShouldExists) {
throw new IndexShardGatewayRecoveryException(shardId(), "shard allocated for local recovery (post api), should exists, but doesn't");

View File

@ -30,10 +30,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
@ -41,8 +38,6 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.File;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
/**
*
@ -88,28 +83,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private final ThreadPool threadPool;
private final TimeValue danglingTimeout;
private final Map<String, DanglingIndex> danglingIndices = ConcurrentCollections.newConcurrentMap();
private final Object danglingMutex = new Object();
private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle;
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
private final ApplySettings applySettings = new ApplySettings();
static class DanglingIndex {
public final String index;
public final ScheduledFuture future;
DanglingIndex(String index, ScheduledFuture future) {
this.index = index;
this.future = future;
}
}
@Inject
public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService, ClusterService clusterService, ThreadPool threadPool) {
super(settings);
@ -124,8 +103,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
this.rateLimitingThrottle = componentSettings.getAsBytesSize("throttle.max_bytes_per_sec", new ByteSizeValue(0));
rateLimiting.setMaxRate(rateLimitingThrottle);
this.danglingTimeout = componentSettings.getAsTime("dangling_timeout", TimeValue.timeValueHours(2));
logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
nodeSettingsService.addListener(applySettings);
@ -221,62 +198,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
}
}
}
if (danglingTimeout.millis() >= 0) {
synchronized (danglingMutex) {
for (String danglingIndex : danglingIndices.keySet()) {
if (event.state().metaData().hasIndex(danglingIndex)) {
logger.debug("[{}] no longer dangling (created), removing", danglingIndex);
DanglingIndex removed = danglingIndices.remove(danglingIndex);
removed.future.cancel(false);
}
}
// delete indices that are no longer part of the metadata
try {
for (String indexName : nodeEnv.findAllIndices()) {
// if we have the index on the metadata, don't delete it
if (event.state().metaData().hasIndex(indexName)) {
continue;
}
if (danglingIndices.containsKey(indexName)) {
// already dangling, continue
continue;
}
if (danglingTimeout.millis() == 0) {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(indexName)));
} else {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}]", indexName, danglingTimeout);
danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName))));
}
}
} catch (Exception e) {
logger.warn("failed to find dangling indices", e);
}
}
}
}
}
class RemoveDanglingIndex implements Runnable {
private final String index;
RemoveDanglingIndex(String index) {
this.index = index;
}
@Override
public void run() {
synchronized (danglingMutex) {
DanglingIndex remove = danglingIndices.remove(index);
// no longer there...
if (remove == null) {
return;
}
logger.info("[{}] deleting dangling index", index);
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(index)));
}
}
}
}

View File

@ -323,4 +323,117 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(1l));
}
}
@Test
public void testDanglingIndices() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting two nodes");
startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
logger.info("--> verify 1 doc in the index");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(1l));
}
assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(true));
logger.info("--> shutting down the nodes");
Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class);
closeNode("node1");
closeNode("node2");
logger.info("--> deleting the data for the first node");
gateway1.reset();
logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)");
startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
logger.info("--> verify that the dangling index does not exists");
assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().exists(), equalTo(false));
logger.info("--> shutdown the nodes");
closeNode("node1");
closeNode("node2");
logger.info("--> start the nodes back, but make sure we do recovery only after we have 2 nodes in the cluster");
startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).put("gateway.recover_after_nodes", 2).build());
startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).put("gateway.recover_after_nodes", 2).build());
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
logger.info("--> verify that the dangling index does exists now!");
assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().exists(), equalTo(true));
logger.info("--> verify the doc is there");
assertThat(client("node1").prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(true));
}
@Test
public void testDanglingIndicesStillDanglingAndCreatingSameIndex() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting two nodes");
startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
logger.info("--> indexing a simple document");
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
logger.info("--> waiting for green status");
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
logger.info("--> verify 1 doc in the index");
for (int i = 0; i < 10; i++) {
assertThat(client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(1l));
}
logger.info("--> shutting down the nodes");
Gateway gateway1 = ((InternalNode) node("node1")).injector().getInstance(Gateway.class);
closeNode("node1");
closeNode("node2");
logger.info("--> deleting the data for the first node");
gateway1.reset();
logger.info("--> start the 2 nodes back, simulating dangling index (exists on second, doesn't exists on first)");
startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
startNode("node2", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
logger.info("--> verify that the dangling index does not exists");
assertThat(client("node1").admin().indices().prepareExists("test").execute().actionGet().exists(), equalTo(false));
logger.info("--> close the first node, so we remain with the second that has the dangling index");
closeNode("node1");
logger.info("--> index a different doc");
client("node2").prepareIndex("test", "type1", "2").setSource("field1", "value2").setRefresh(true).execute().actionGet();
assertThat(client("node2").prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
assertThat(client("node2").prepareGet("test", "type1", "2").execute().actionGet().exists(), equalTo(true));
}
}