From 136d36b724dabbe9c84ca9dac6901cef2c712fd2 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 20 Feb 2015 21:44:39 +0100 Subject: [PATCH] [ENV] NodeEnv should lock all shards for an index Today locking all shards only locks the shards that are present on the node or that still have a shard directory. This can lead to odd behavior if another shard that doesn't exist yet is allocated while all shards are supposed to be locked. --- .../action/index/NodeIndexDeletedAction.java | 10 ++--- .../elasticsearch/env/NodeEnvironment.java | 17 ++++---- .../elasticsearch/indices/IndicesService.java | 4 +- .../cluster/IndicesClusterStateService.java | 10 +++-- .../env/NodeEnvironmentTests.java | 39 ++++++++++--------- .../indices/IndicesServiceTest.java | 6 +-- 6 files changed, 47 insertions(+), 39 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java index 2c33a5251b2..6b5a150f38c 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java @@ -74,7 +74,7 @@ public class NodeIndexDeletedAction extends AbstractComponent { listeners.remove(listener); } - public void nodeIndexDeleted(final ClusterState clusterState, final String index, final String nodeId) throws ElasticsearchException { + public void nodeIndexDeleted(final ClusterState clusterState, final String index, final Settings indexSettings, final String nodeId) throws ElasticsearchException { final DiscoveryNodes nodes = clusterState.nodes(); if (nodes.localNodeMaster()) { threadPool.generic().execute(new AbstractRunnable() { @@ -91,7 +91,7 @@ public class NodeIndexDeletedAction extends AbstractComponent { logger.trace("[{}] not acking store deletion (not a data node)"); return; } - lockIndexAndAck(index, nodes, nodeId, clusterState); + lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings); } }); @@ -110,19 +110,19 @@ public class NodeIndexDeletedAction extends AbstractComponent { @Override protected void doRun() throws Exception { - lockIndexAndAck(index, nodes, nodeId, clusterState); + lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings); } }); } } - private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId, ClusterState clusterState) throws IOException { + private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId, ClusterState clusterState, Settings indexSettings) throws IOException { try { // we are waiting until we can lock the index / all shards on the node and then we ack the delete of the store to the // master. If we can't acquire the locks here immediately there might be a shard of this index still holding on to the lock // due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be // deleted by the time we get the lock - indicesService.processPendingDeletes(new Index(index), new TimeValue(30, TimeUnit.MINUTES)); + indicesService.processPendingDeletes(new Index(index), indexSettings, new TimeValue(30, TimeUnit.MINUTES)); if (nodes.localNodeMaster()) { innerNodeIndexStoreDeleted(index, nodeId); } else { diff --git a/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 0dbd9985a88..38f723d7999 100644 --- a/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -230,7 +230,7 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{ public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, @IndexSettings Settings indexSettings) throws IOException { // This is to ensure someone doesn't use ImmutableSettings.EMPTY assert indexSettings != ImmutableSettings.EMPTY; - final List locks = lockAllForIndex(index, lockTimeoutMS); + final List locks = lockAllForIndex(index, indexSettings, lockTimeoutMS); try { final Path[] indexPaths = indexPaths(index); logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths); @@ -255,16 +255,19 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{ * @return the {@link ShardLock} instances for this index. * @throws IOException if an IOException occurs. */ - public List lockAllForIndex(Index index, long lockTimeoutMS) throws IOException { - Set allShardIds = findAllShardIds(index); - logger.trace("locking all shards for index {} - [{}]", index, allShardIds); - List allLocks = new ArrayList<>(allShardIds.size()); + public List lockAllForIndex(Index index, @IndexSettings Settings settings, long lockTimeoutMS) throws IOException { + final Integer numShards = settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, null); + if (numShards == null || numShards <= 0) { + throw new IllegalArgumentException("settings must contain a non-null > 0 number of shards"); + } + logger.trace("locking all shards for index {} - [{}]", index, numShards); + List allLocks = new ArrayList<>(numShards); boolean success = false; long startTime = System.currentTimeMillis(); try { - for (ShardId shardId : allShardIds) { + for (int i = 0; i < numShards; i++) { long timeoutLeft = Math.max(0, lockTimeoutMS - (System.currentTimeMillis() - startTime)); - allLocks.add(shardLock(shardId, timeoutLeft)); + allLocks.add(shardLock(new ShardId(index, i), timeoutLeft)); } success = true; } finally { diff --git a/src/main/java/org/elasticsearch/indices/IndicesService.java b/src/main/java/org/elasticsearch/indices/IndicesService.java index f21ac36c344..17d2827ce4b 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -640,9 +640,9 @@ public class IndicesService extends AbstractLifecycleComponent i * @param index the index to process the pending deletes for * @param timeout the timeout used for processing pending deletes */ - public void processPendingDeletes(Index index, TimeValue timeout) throws IOException { + public void processPendingDeletes(Index index, @IndexSettings Settings indexSettings, TimeValue timeout) throws IOException { final long startTime = System.currentTimeMillis(); - final List shardLocks = nodeEnv.lockAllForIndex(index, timeout.millis()); + final List shardLocks = nodeEnv.lockAllForIndex(index, indexSettings, timeout.millis()); try { Map locks = new HashMap<>(); for (ShardLock lock : shardLocks) { diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index b8972a6b82e..ddabe6a49bd 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -238,15 +238,19 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent locks = env.lockAllForIndex(new Index("foo"), randomIntBetween(0, 10)); + List locks = env.lockAllForIndex(new Index("foo"), settings, randomIntBetween(0, 10)); try { - env.shardLock(new ShardId("foo", randomBoolean() ? 1 : 2)); + env.shardLock(new ShardId("foo", randomIntBetween(0, 1))); fail("shard is locked"); } catch (LockObtainFailedException ex) { // expected @@ -151,33 +152,33 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase { @Test public void testDeleteSafe() throws IOException, InterruptedException { final NodeEnvironment env = newNodeEnvironment(); - ShardLock fooLock = env.shardLock(new ShardId("foo", 1)); - assertEquals(new ShardId("foo", 1), fooLock.getShardId()); + ShardLock fooLock = env.shardLock(new ShardId("foo", 0)); + assertEquals(new ShardId("foo", 0), fooLock.getShardId()); for (Path path : env.indexPaths(new Index("foo"))) { + Files.createDirectories(path.resolve("0")); Files.createDirectories(path.resolve("1")); - Files.createDirectories(path.resolve("2")); } try { - env.deleteShardDirectorySafe(new ShardId("foo", 1), idxSettings); + env.deleteShardDirectorySafe(new ShardId("foo", 0), idxSettings); fail("shard is locked"); } catch (LockObtainFailedException ex) { // expected } for (Path path : env.indexPaths(new Index("foo"))) { + assertTrue(Files.exists(path.resolve("0"))); assertTrue(Files.exists(path.resolve("1"))); - assertTrue(Files.exists(path.resolve("2"))); } - env.deleteShardDirectorySafe(new ShardId("foo", 2), idxSettings); + env.deleteShardDirectorySafe(new ShardId("foo", 1), idxSettings); for (Path path : env.indexPaths(new Index("foo"))) { - assertTrue(Files.exists(path.resolve("1"))); - assertFalse(Files.exists(path.resolve("2"))); + assertTrue(Files.exists(path.resolve("0"))); + assertFalse(Files.exists(path.resolve("1"))); } try { @@ -209,7 +210,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase { @Override protected void doRun() throws Exception { start.await(); - try (ShardLock _ = env.shardLock(new ShardId("foo", 1))) { + try (ShardLock _ = env.shardLock(new ShardId("foo", 0))) { blockLatch.countDown(); Thread.sleep(randomIntBetween(1, 10)); } diff --git a/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java b/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java index 2ebc8e2d573..653add44e1f 100644 --- a/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java +++ b/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java @@ -134,7 +134,7 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest { assertTrue(test.hasShard(0)); Path[] paths = nodeEnc.shardDataPaths(new ShardId(test.index(), 0), test.getIndexSettings()); try { - indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS)); + indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS)); fail("can't get lock"); } catch (LockObtainFailedException ex) { @@ -149,7 +149,7 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest { } assertEquals(indicesService.numPendingDeletes(test.index()), 1); // shard lock released... we can now delete - indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS)); + indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS)); assertEquals(indicesService.numPendingDeletes(test.index()), 0); for (Path p : paths) { assertFalse(Files.exists(p)); @@ -161,7 +161,7 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest { indicesService.addPendingDelete(new Index("bogus"), new ShardId("bogus", 1), test.getIndexSettings()); assertEquals(indicesService.numPendingDeletes(test.index()), 2); // shard lock released... we can now delete - indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS)); + indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS)); assertEquals(indicesService.numPendingDeletes(test.index()), 0); } assertAcked(client().admin().indices().prepareOpen("test"));