diff --git a/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 8600e0e44ec..e018e07499c 100644 --- a/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -21,9 +21,10 @@ package org.elasticsearch.env; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; - +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.store.*; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.SuppressForbidden; @@ -37,6 +38,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.FsDirectoryService; import org.elasticsearch.monitor.fs.FsStats; import org.elasticsearch.monitor.fs.JmxFsProbe; @@ -297,19 +299,57 @@ public class NodeEnvironment extends AbstractComponent implements Closeable { } /** - * Deletes a shard data directory. Note: this method assumes that the shard lock is acquired + * Acquires, then releases, all {@code write.lock} files in the given + * shard paths. The "write.lock" file is assumed to be under the shard + * path's "index" directory as used by Elasticsearch. + * + * @throws ElasticsearchException if any of the locks could not be acquired + */ + public static void acquireFSLockForPaths(@IndexSettings Settings indexSettings, Path... shardPaths) throws IOException { + Lock[] locks = new Lock[shardPaths.length]; + Directory[] dirs = new Directory[shardPaths.length]; + try { + for (int i = 0; i < shardPaths.length; i++) { + // resolve the directory the shard actually lives in + Path p = shardPaths[i].resolve("index"); + // open a directory (will be immediately closed) on the shard's location + dirs[i] = new SimpleFSDirectory(p, FsDirectoryService.buildLockFactory(indexSettings)); + // create a lock for the "write.lock" file + locks[i] = dirs[i].makeLock(IndexWriter.WRITE_LOCK_NAME); + if (locks[i].obtain() == false) { + throw new ElasticsearchException("unable to acquire " + + IndexWriter.WRITE_LOCK_NAME + " for " + p); + } + } + } finally { + IOUtils.closeWhileHandlingException(locks); + IOUtils.closeWhileHandlingException(dirs); + } + } + + /** + * Deletes a shard data directory. Note: this method assumes that the shard + * lock is acquired. This method will also attempt to acquire the write + * locks for the shard's paths before deleting the data, but this is best + * effort, as the lock is released before the deletion happens in order to + * allow the folder to be deleted * * @param lock the shards lock * @throws IOException if an IOException occurs + * @throws ElasticsearchException if the write.lock is not acquirable */ public void deleteShardDirectoryUnderLock(ShardLock lock, @IndexSettings Settings indexSettings) throws IOException { assert indexSettings != ImmutableSettings.EMPTY; final ShardId shardId = lock.getShardId(); assert isShardLocked(shardId) : "shard " + shardId + " is not locked"; final Path[] paths = availableShardPaths(shardId); + logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths); + acquireFSLockForPaths(indexSettings, paths); IOUtils.rm(paths); if (hasCustomDataPath(indexSettings)) { Path customLocation = resolveCustomLocation(indexSettings, shardId); + logger.trace("acquiring lock for {}, custom path: [{}]", shardId, customLocation); + acquireFSLockForPaths(indexSettings, customLocation); logger.trace("deleting custom shard {} directory [{}]", shardId, customLocation); IOUtils.rm(customLocation); } diff --git a/src/main/java/org/elasticsearch/index/store/FsDirectoryService.java b/src/main/java/org/elasticsearch/index/store/FsDirectoryService.java index f67bc340125..afc2dac3359 100644 --- a/src/main/java/org/elasticsearch/index/store/FsDirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/FsDirectoryService.java @@ -19,12 +19,6 @@ package org.elasticsearch.index.store; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collections; -import java.util.Set; - import com.google.common.collect.Sets; import org.apache.lucene.store.*; import org.apache.lucene.util.Constants; @@ -32,12 +26,13 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; -import org.elasticsearch.index.store.DirectoryService; -import org.elasticsearch.index.store.IndexStore; -import org.elasticsearch.index.store.IndexStoreModule; -import org.elasticsearch.index.store.StoreException; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Set; /** */ @@ -65,7 +60,7 @@ public class FsDirectoryService extends DirectoryService implements StoreRateLim return indexStore.rateLimiting(); } - protected final LockFactory buildLockFactory() throws IOException { + public static LockFactory buildLockFactory(@IndexSettings Settings indexSettings) { String fsLock = indexSettings.get("index.store.fs.lock", indexSettings.get("index.store.fs.fs_lock", "native")); LockFactory lockFactory; if (fsLock.equals("native")) { @@ -73,11 +68,19 @@ public class FsDirectoryService extends DirectoryService implements StoreRateLim } else if (fsLock.equals("simple")) { lockFactory = SimpleFSLockFactory.INSTANCE; } else { - throw new StoreException(shardId, "unrecognized fs_lock \"" + fsLock + "\": must be native or simple"); + throw new IllegalArgumentException("unrecognized fs_lock \"" + fsLock + "\": must be native or simple"); } return lockFactory; } + protected final LockFactory buildLockFactory() throws IOException { + try { + return buildLockFactory(indexSettings); + } catch (IllegalArgumentException e) { + throw new StoreException(shardId, "unable to build lock factory", e); + } + } + @Override public Directory newDirectory() throws IOException { final Path location = path.resolveIndex(); diff --git a/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java b/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java index 17b095d7e26..d3a1a00e98b 100644 --- a/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java +++ b/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java @@ -66,15 +66,20 @@ import static org.hamcrest.Matchers.*; @ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0) public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { + private Settings nodeSettings() { + return ImmutableSettings.builder() + .put("node.add_id_to_custom_path", false) + .put("node.enable_custom_paths", true) + .put("index.store.fs.fs_lock", randomFrom("native", "simple")) + .build(); + } + /** * Tests the case where we create an index without shadow replicas, snapshot it and then restore into * an index with shadow replicas enabled. */ public void testRestoreToShadow() throws ExecutionException, InterruptedException { - Settings nodeSettings = ImmutableSettings.builder() - .put("node.add_id_to_custom_path", false) - .put("node.enable_custom_paths", true) - .build(); + Settings nodeSettings = nodeSettings(); internalCluster().startNodesAsync(3, nodeSettings).get(); final Path dataPath = createTempDir(); @@ -130,10 +135,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { @Test public void testIndexWithFewDocuments() throws Exception { - Settings nodeSettings = ImmutableSettings.builder() - .put("node.add_id_to_custom_path", false) - .put("node.enable_custom_paths", true) - .build(); + Settings nodeSettings = nodeSettings(); internalCluster().startNodesAsync(3, nodeSettings).get(); final String IDX = "test"; @@ -196,10 +198,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { @Test public void testReplicaToPrimaryPromotion() throws Exception { - Settings nodeSettings = ImmutableSettings.builder() - .put("node.add_id_to_custom_path", false) - .put("node.enable_custom_paths", true) - .build(); + Settings nodeSettings = nodeSettings(); String node1 = internalCluster().startNode(nodeSettings); Path dataPath = createTempDir(); @@ -258,10 +257,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { @Test public void testPrimaryRelocation() throws Exception { - Settings nodeSettings = ImmutableSettings.builder() - .put("node.add_id_to_custom_path", false) - .put("node.enable_custom_paths", true) - .build(); + Settings nodeSettings = nodeSettings(); String node1 = internalCluster().startNode(nodeSettings); Path dataPath = createTempDir(); @@ -322,10 +318,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { @Test public void testPrimaryRelocationWithConcurrentIndexing() throws Exception { - Settings nodeSettings = ImmutableSettings.builder() - .put("node.add_id_to_custom_path", false) - .put("node.enable_custom_paths", true) - .build(); + Settings nodeSettings = nodeSettings(); String node1 = internalCluster().startNode(nodeSettings); Path dataPath = createTempDir(); @@ -490,10 +483,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { @Test public void testIndexWithShadowReplicasCleansUp() throws Exception { - Settings nodeSettings = ImmutableSettings.builder() - .put("node.add_id_to_custom_path", false) - .put("node.enable_custom_paths", true) - .build(); + Settings nodeSettings = nodeSettings(); int nodeCount = randomIntBetween(2, 5); internalCluster().startNodesAsync(nodeCount, nodeSettings).get(); @@ -535,10 +525,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { */ @Test public void testShadowReplicaNaturalRelocation() throws Exception { - Settings nodeSettings = ImmutableSettings.builder() - .put("node.add_id_to_custom_path", false) - .put("node.enable_custom_paths", true) - .build(); + Settings nodeSettings = nodeSettings(); internalCluster().startNodesAsync(2, nodeSettings).get(); Path dataPath = createTempDir(); @@ -592,10 +579,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { @Test public void testShadowReplicasUsingFieldData() throws Exception { - Settings nodeSettings = ImmutableSettings.builder() - .put("node.add_id_to_custom_path", false) - .put("node.enable_custom_paths", true) - .build(); + Settings nodeSettings = nodeSettings(); internalCluster().startNodesAsync(3, nodeSettings).get(); Path dataPath = createTempDir(); @@ -630,10 +614,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { @Test public void testIndexOnSharedFSRecoversToAnyNode() throws Exception { - Settings nodeSettings = ImmutableSettings.builder() - .put("node.add_id_to_custom_path", false) - .put("node.enable_custom_paths", true) - .build(); + Settings nodeSettings = nodeSettings(); internalCluster().startNode(nodeSettings); Path dataPath = createTempDir(); diff --git a/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index df6be000aa0..eb50b5e331c 100644 --- a/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.index.shard; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.ShardRouting; @@ -25,13 +26,14 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ElasticsearchSingleNodeTest; import org.junit.Test; -import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; import java.util.HashSet; @@ -93,6 +95,33 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest { } } + @Test + public void testLockTryingToDelete() throws Exception { + createIndex("test"); + ensureGreen(); + //IndicesService indicesService = getInstanceFromNode(IndicesService.class); + NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class); + Path[] shardPaths = env.availableShardPaths(new ShardId("test", 0)); + logger.info("--> paths: [{}]", shardPaths); + // Should not be able to acquire the lock because it's already open + try { + NodeEnvironment.acquireFSLockForPaths(ImmutableSettings.EMPTY, shardPaths); + fail("should not have been able to acquire the lock"); + } catch (ElasticsearchException e) { + assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock")); + } + // Test without the regular shard lock to assume we can acquire it + // (worst case, meaning that the shard lock could be acquired and + // we're green to delete the shard's directory) + ShardLock sLock = new DummyShardLock(new ShardId("test", 0)); + try { + env.deleteShardDirectoryUnderLock(sLock, ImmutableSettings.builder().build()); + fail("should not have been able to delete the directory"); + } catch (ElasticsearchException e) { + assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock")); + } + } + public void testPersistenceStateMetadataPersistence() throws Exception { createIndex("test"); ensureGreen();