Merge remote-tracking branch 'dakrone/acquire-write-lock-on-delete'

This commit is contained in:
Lee Hinman 2015-05-12 13:23:09 -06:00
commit 990c3a9093
4 changed files with 105 additions and 52 deletions

View File

@ -21,9 +21,10 @@ package org.elasticsearch.env;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.*; import org.apache.lucene.store.*;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
@ -37,6 +38,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.monitor.fs.FsStats; import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.monitor.fs.JmxFsProbe; import org.elasticsearch.monitor.fs.JmxFsProbe;
@ -297,19 +299,57 @@ public class NodeEnvironment extends AbstractComponent implements Closeable {
} }
/** /**
* Deletes a shard data directory. Note: this method assumes that the shard lock is acquired * Acquires, then releases, all {@code write.lock} files in the given
* shard paths. The "write.lock" file is assumed to be under the shard
* path's "index" directory as used by Elasticsearch.
*
* @throws ElasticsearchException if any of the locks could not be acquired
*/
public static void acquireFSLockForPaths(@IndexSettings Settings indexSettings, Path... shardPaths) throws IOException {
Lock[] locks = new Lock[shardPaths.length];
Directory[] dirs = new Directory[shardPaths.length];
try {
for (int i = 0; i < shardPaths.length; i++) {
// resolve the directory the shard actually lives in
Path p = shardPaths[i].resolve("index");
// open a directory (will be immediately closed) on the shard's location
dirs[i] = new SimpleFSDirectory(p, FsDirectoryService.buildLockFactory(indexSettings));
// create a lock for the "write.lock" file
locks[i] = dirs[i].makeLock(IndexWriter.WRITE_LOCK_NAME);
if (locks[i].obtain() == false) {
throw new ElasticsearchException("unable to acquire " +
IndexWriter.WRITE_LOCK_NAME + " for " + p);
}
}
} finally {
IOUtils.closeWhileHandlingException(locks);
IOUtils.closeWhileHandlingException(dirs);
}
}
/**
* Deletes a shard data directory. Note: this method assumes that the shard
* lock is acquired. This method will also attempt to acquire the write
* locks for the shard's paths before deleting the data, but this is best
* effort, as the lock is released before the deletion happens in order to
* allow the folder to be deleted
* *
* @param lock the shards lock * @param lock the shards lock
* @throws IOException if an IOException occurs * @throws IOException if an IOException occurs
* @throws ElasticsearchException if the write.lock is not acquirable
*/ */
public void deleteShardDirectoryUnderLock(ShardLock lock, @IndexSettings Settings indexSettings) throws IOException { public void deleteShardDirectoryUnderLock(ShardLock lock, @IndexSettings Settings indexSettings) throws IOException {
assert indexSettings != ImmutableSettings.EMPTY; assert indexSettings != ImmutableSettings.EMPTY;
final ShardId shardId = lock.getShardId(); final ShardId shardId = lock.getShardId();
assert isShardLocked(shardId) : "shard " + shardId + " is not locked"; assert isShardLocked(shardId) : "shard " + shardId + " is not locked";
final Path[] paths = availableShardPaths(shardId); final Path[] paths = availableShardPaths(shardId);
logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths);
acquireFSLockForPaths(indexSettings, paths);
IOUtils.rm(paths); IOUtils.rm(paths);
if (hasCustomDataPath(indexSettings)) { if (hasCustomDataPath(indexSettings)) {
Path customLocation = resolveCustomLocation(indexSettings, shardId); Path customLocation = resolveCustomLocation(indexSettings, shardId);
logger.trace("acquiring lock for {}, custom path: [{}]", shardId, customLocation);
acquireFSLockForPaths(indexSettings, customLocation);
logger.trace("deleting custom shard {} directory [{}]", shardId, customLocation); logger.trace("deleting custom shard {} directory [{}]", shardId, customLocation);
IOUtils.rm(customLocation); IOUtils.rm(customLocation);
} }

View File

@ -19,12 +19,6 @@
package org.elasticsearch.index.store; package org.elasticsearch.index.store;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Set;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.lucene.store.*; import org.apache.lucene.store.*;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
@ -32,12 +26,13 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore; import java.io.IOException;
import org.elasticsearch.index.store.IndexStoreModule; import java.nio.file.Files;
import org.elasticsearch.index.store.StoreException; import java.nio.file.Path;
import java.util.Collections;
import java.util.Set;
/** /**
*/ */
@ -65,7 +60,7 @@ public class FsDirectoryService extends DirectoryService implements StoreRateLim
return indexStore.rateLimiting(); return indexStore.rateLimiting();
} }
protected final LockFactory buildLockFactory() throws IOException { public static LockFactory buildLockFactory(@IndexSettings Settings indexSettings) {
String fsLock = indexSettings.get("index.store.fs.lock", indexSettings.get("index.store.fs.fs_lock", "native")); String fsLock = indexSettings.get("index.store.fs.lock", indexSettings.get("index.store.fs.fs_lock", "native"));
LockFactory lockFactory; LockFactory lockFactory;
if (fsLock.equals("native")) { if (fsLock.equals("native")) {
@ -73,11 +68,19 @@ public class FsDirectoryService extends DirectoryService implements StoreRateLim
} else if (fsLock.equals("simple")) { } else if (fsLock.equals("simple")) {
lockFactory = SimpleFSLockFactory.INSTANCE; lockFactory = SimpleFSLockFactory.INSTANCE;
} else { } else {
throw new StoreException(shardId, "unrecognized fs_lock \"" + fsLock + "\": must be native or simple"); throw new IllegalArgumentException("unrecognized fs_lock \"" + fsLock + "\": must be native or simple");
} }
return lockFactory; return lockFactory;
} }
protected final LockFactory buildLockFactory() throws IOException {
try {
return buildLockFactory(indexSettings);
} catch (IllegalArgumentException e) {
throw new StoreException(shardId, "unable to build lock factory", e);
}
}
@Override @Override
public Directory newDirectory() throws IOException { public Directory newDirectory() throws IOException {
final Path location = path.resolveIndex(); final Path location = path.resolveIndex();

View File

@ -66,15 +66,20 @@ import static org.hamcrest.Matchers.*;
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0) @ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0)
public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
private Settings nodeSettings() {
return ImmutableSettings.builder()
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.put("index.store.fs.fs_lock", randomFrom("native", "simple"))
.build();
}
/** /**
* Tests the case where we create an index without shadow replicas, snapshot it and then restore into * Tests the case where we create an index without shadow replicas, snapshot it and then restore into
* an index with shadow replicas enabled. * an index with shadow replicas enabled.
*/ */
public void testRestoreToShadow() throws ExecutionException, InterruptedException { public void testRestoreToShadow() throws ExecutionException, InterruptedException {
Settings nodeSettings = ImmutableSettings.builder() Settings nodeSettings = nodeSettings();
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
internalCluster().startNodesAsync(3, nodeSettings).get(); internalCluster().startNodesAsync(3, nodeSettings).get();
final Path dataPath = createTempDir(); final Path dataPath = createTempDir();
@ -130,10 +135,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
@Test @Test
public void testIndexWithFewDocuments() throws Exception { public void testIndexWithFewDocuments() throws Exception {
Settings nodeSettings = ImmutableSettings.builder() Settings nodeSettings = nodeSettings();
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
internalCluster().startNodesAsync(3, nodeSettings).get(); internalCluster().startNodesAsync(3, nodeSettings).get();
final String IDX = "test"; final String IDX = "test";
@ -196,10 +198,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
@Test @Test
public void testReplicaToPrimaryPromotion() throws Exception { public void testReplicaToPrimaryPromotion() throws Exception {
Settings nodeSettings = ImmutableSettings.builder() Settings nodeSettings = nodeSettings();
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
String node1 = internalCluster().startNode(nodeSettings); String node1 = internalCluster().startNode(nodeSettings);
Path dataPath = createTempDir(); Path dataPath = createTempDir();
@ -258,10 +257,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
@Test @Test
public void testPrimaryRelocation() throws Exception { public void testPrimaryRelocation() throws Exception {
Settings nodeSettings = ImmutableSettings.builder() Settings nodeSettings = nodeSettings();
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
String node1 = internalCluster().startNode(nodeSettings); String node1 = internalCluster().startNode(nodeSettings);
Path dataPath = createTempDir(); Path dataPath = createTempDir();
@ -322,10 +318,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
@Test @Test
public void testPrimaryRelocationWithConcurrentIndexing() throws Exception { public void testPrimaryRelocationWithConcurrentIndexing() throws Exception {
Settings nodeSettings = ImmutableSettings.builder() Settings nodeSettings = nodeSettings();
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
String node1 = internalCluster().startNode(nodeSettings); String node1 = internalCluster().startNode(nodeSettings);
Path dataPath = createTempDir(); Path dataPath = createTempDir();
@ -490,10 +483,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
@Test @Test
public void testIndexWithShadowReplicasCleansUp() throws Exception { public void testIndexWithShadowReplicasCleansUp() throws Exception {
Settings nodeSettings = ImmutableSettings.builder() Settings nodeSettings = nodeSettings();
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
int nodeCount = randomIntBetween(2, 5); int nodeCount = randomIntBetween(2, 5);
internalCluster().startNodesAsync(nodeCount, nodeSettings).get(); internalCluster().startNodesAsync(nodeCount, nodeSettings).get();
@ -535,10 +525,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
*/ */
@Test @Test
public void testShadowReplicaNaturalRelocation() throws Exception { public void testShadowReplicaNaturalRelocation() throws Exception {
Settings nodeSettings = ImmutableSettings.builder() Settings nodeSettings = nodeSettings();
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
internalCluster().startNodesAsync(2, nodeSettings).get(); internalCluster().startNodesAsync(2, nodeSettings).get();
Path dataPath = createTempDir(); Path dataPath = createTempDir();
@ -592,10 +579,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
@Test @Test
public void testShadowReplicasUsingFieldData() throws Exception { public void testShadowReplicasUsingFieldData() throws Exception {
Settings nodeSettings = ImmutableSettings.builder() Settings nodeSettings = nodeSettings();
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
internalCluster().startNodesAsync(3, nodeSettings).get(); internalCluster().startNodesAsync(3, nodeSettings).get();
Path dataPath = createTempDir(); Path dataPath = createTempDir();
@ -630,10 +614,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
@Test @Test
public void testIndexOnSharedFSRecoversToAnyNode() throws Exception { public void testIndexOnSharedFSRecoversToAnyNode() throws Exception {
Settings nodeSettings = ImmutableSettings.builder() Settings nodeSettings = nodeSettings();
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
internalCluster().startNode(nodeSettings); internalCluster().startNode(nodeSettings);
Path dataPath = createTempDir(); Path dataPath = createTempDir();

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.index.shard; package org.elasticsearch.index.shard;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
@ -25,13 +26,14 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchSingleNodeTest; import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.junit.Test; import org.junit.Test;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.HashSet; import java.util.HashSet;
@ -93,6 +95,33 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
} }
} }
@Test
public void testLockTryingToDelete() throws Exception {
createIndex("test");
ensureGreen();
//IndicesService indicesService = getInstanceFromNode(IndicesService.class);
NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class);
Path[] shardPaths = env.availableShardPaths(new ShardId("test", 0));
logger.info("--> paths: [{}]", shardPaths);
// Should not be able to acquire the lock because it's already open
try {
NodeEnvironment.acquireFSLockForPaths(ImmutableSettings.EMPTY, shardPaths);
fail("should not have been able to acquire the lock");
} catch (ElasticsearchException e) {
assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock"));
}
// Test without the regular shard lock to assume we can acquire it
// (worst case, meaning that the shard lock could be acquired and
// we're green to delete the shard's directory)
ShardLock sLock = new DummyShardLock(new ShardId("test", 0));
try {
env.deleteShardDirectoryUnderLock(sLock, ImmutableSettings.builder().build());
fail("should not have been able to delete the directory");
} catch (ElasticsearchException e) {
assertTrue("msg: " + e.getMessage(), e.getMessage().contains("unable to acquire write.lock"));
}
}
public void testPersistenceStateMetadataPersistence() throws Exception { public void testPersistenceStateMetadataPersistence() throws Exception {
createIndex("test"); createIndex("test");
ensureGreen(); ensureGreen();