[ENV] NodeEnv should lock all shards for an index

Today locking all shards only locks the shards that are present on
the node or that still have a shard directory. This can lead to odd
behavior if another shard that doesn't exist yet is allocated while
all shards are supposed to be locked.
This commit is contained in:
Simon Willnauer 2015-02-20 21:44:39 +01:00
parent 94348cb788
commit 136d36b724
6 changed files with 47 additions and 39 deletions

View File

@ -74,7 +74,7 @@ public class NodeIndexDeletedAction extends AbstractComponent {
listeners.remove(listener);
}
public void nodeIndexDeleted(final ClusterState clusterState, final String index, final String nodeId) throws ElasticsearchException {
public void nodeIndexDeleted(final ClusterState clusterState, final String index, final Settings indexSettings, final String nodeId) throws ElasticsearchException {
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster()) {
threadPool.generic().execute(new AbstractRunnable() {
@ -91,7 +91,7 @@ public class NodeIndexDeletedAction extends AbstractComponent {
logger.trace("[{}] not acking store deletion (not a data node)");
return;
}
lockIndexAndAck(index, nodes, nodeId, clusterState);
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);
}
});
@ -110,19 +110,19 @@ public class NodeIndexDeletedAction extends AbstractComponent {
@Override
protected void doRun() throws Exception {
lockIndexAndAck(index, nodes, nodeId, clusterState);
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);
}
});
}
}
private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId, ClusterState clusterState) throws IOException {
private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId, ClusterState clusterState, Settings indexSettings) throws IOException {
try {
// we are waiting until we can lock the index / all shards on the node and then we ack the delete of the store to the
// master. If we can't acquire the locks here immediately there might be a shard of this index still holding on to the lock
// due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be
// deleted by the time we get the lock
indicesService.processPendingDeletes(new Index(index), new TimeValue(30, TimeUnit.MINUTES));
indicesService.processPendingDeletes(new Index(index), indexSettings, new TimeValue(30, TimeUnit.MINUTES));
if (nodes.localNodeMaster()) {
innerNodeIndexStoreDeleted(index, nodeId);
} else {

View File

@ -230,7 +230,7 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, @IndexSettings Settings indexSettings) throws IOException {
// This is to ensure someone doesn't use ImmutableSettings.EMPTY
assert indexSettings != ImmutableSettings.EMPTY;
final List<ShardLock> locks = lockAllForIndex(index, lockTimeoutMS);
final List<ShardLock> locks = lockAllForIndex(index, indexSettings, lockTimeoutMS);
try {
final Path[] indexPaths = indexPaths(index);
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
@ -255,16 +255,19 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
* @return the {@link ShardLock} instances for this index.
* @throws IOException if an IOException occurs.
*/
public List<ShardLock> lockAllForIndex(Index index, long lockTimeoutMS) throws IOException {
Set<ShardId> allShardIds = findAllShardIds(index);
logger.trace("locking all shards for index {} - [{}]", index, allShardIds);
List<ShardLock> allLocks = new ArrayList<>(allShardIds.size());
public List<ShardLock> lockAllForIndex(Index index, @IndexSettings Settings settings, long lockTimeoutMS) throws IOException {
final Integer numShards = settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, null);
if (numShards == null || numShards <= 0) {
throw new IllegalArgumentException("settings must contain a non-null > 0 number of shards");
}
logger.trace("locking all shards for index {} - [{}]", index, numShards);
List<ShardLock> allLocks = new ArrayList<>(numShards);
boolean success = false;
long startTime = System.currentTimeMillis();
try {
for (ShardId shardId : allShardIds) {
for (int i = 0; i < numShards; i++) {
long timeoutLeft = Math.max(0, lockTimeoutMS - (System.currentTimeMillis() - startTime));
allLocks.add(shardLock(shardId, timeoutLeft));
allLocks.add(shardLock(new ShardId(index, i), timeoutLeft));
}
success = true;
} finally {

View File

@ -640,9 +640,9 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
* @param index the index to process the pending deletes for
* @param timeout the timeout used for processing pending deletes
*/
public void processPendingDeletes(Index index, TimeValue timeout) throws IOException {
public void processPendingDeletes(Index index, @IndexSettings Settings indexSettings, TimeValue timeout) throws IOException {
final long startTime = System.currentTimeMillis();
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, timeout.millis());
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, indexSettings, timeout.millis());
try {
Map<ShardId, ShardLock> locks = new HashMap<>();
for (ShardLock lock : shardLocks) {

View File

@ -238,15 +238,19 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
}
if (indicesService.hasIndex(index)) {
final Settings indexSettings;
final IndexService idxService = indicesService.indexService(index);
if (idxService != null) {
indexSettings = idxService.getIndexSettings();
deleteIndex(index, "index no longer part of the metadata");
} else {
IndexMetaData metaData = previousState.metaData().index(index);
final IndexMetaData metaData = previousState.metaData().index(index);
assert metaData != null;
indexSettings = metaData.settings();
indicesService.deleteClosedIndex("closed index no longer part of the metadata", metaData);
}
try {
nodeIndexDeletedAction.nodeIndexDeleted(event.state(), index, localNodeId);
nodeIndexDeletedAction.nodeIndexDeleted(event.state(), index, indexSettings, localNodeId);
} catch (Throwable e) {
logger.debug("failed to send to master index {} deleted event", e, index);
}

View File

@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.CoreMatchers.equalTo;
public class NodeEnvironmentTests extends ElasticsearchTestCase {
@ -93,34 +94,34 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
public void testShardLock() throws IOException {
final NodeEnvironment env = newNodeEnvironment();
ShardLock fooLock = env.shardLock(new ShardId("foo", 1));
assertEquals(new ShardId("foo", 1), fooLock.getShardId());
ShardLock fooLock = env.shardLock(new ShardId("foo", 0));
assertEquals(new ShardId("foo", 0), fooLock.getShardId());
try {
env.shardLock(new ShardId("foo", 1));
env.shardLock(new ShardId("foo", 0));
fail("shard is locked");
} catch (LockObtainFailedException ex) {
// expected
}
for (Path path : env.indexPaths(new Index("foo"))) {
Files.createDirectories(path.resolve("0"));
Files.createDirectories(path.resolve("1"));
Files.createDirectories(path.resolve("2"));
}
Settings settings = settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10)).build();
try {
env.lockAllForIndex(new Index("foo"), randomIntBetween(0, 10));
fail("shard 1 is locked");
env.lockAllForIndex(new Index("foo"), settings, randomIntBetween(0, 10));
fail("shard 0 is locked");
} catch (LockObtainFailedException ex) {
// expected
}
fooLock.close();
// can lock again?
env.shardLock(new ShardId("foo", 1)).close();
env.shardLock(new ShardId("foo", 0)).close();
List<ShardLock> locks = env.lockAllForIndex(new Index("foo"), randomIntBetween(0, 10));
List<ShardLock> locks = env.lockAllForIndex(new Index("foo"), settings, randomIntBetween(0, 10));
try {
env.shardLock(new ShardId("foo", randomBoolean() ? 1 : 2));
env.shardLock(new ShardId("foo", randomIntBetween(0, 1)));
fail("shard is locked");
} catch (LockObtainFailedException ex) {
// expected
@ -151,33 +152,33 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
@Test
public void testDeleteSafe() throws IOException, InterruptedException {
final NodeEnvironment env = newNodeEnvironment();
ShardLock fooLock = env.shardLock(new ShardId("foo", 1));
assertEquals(new ShardId("foo", 1), fooLock.getShardId());
ShardLock fooLock = env.shardLock(new ShardId("foo", 0));
assertEquals(new ShardId("foo", 0), fooLock.getShardId());
for (Path path : env.indexPaths(new Index("foo"))) {
Files.createDirectories(path.resolve("0"));
Files.createDirectories(path.resolve("1"));
Files.createDirectories(path.resolve("2"));
}
try {
env.deleteShardDirectorySafe(new ShardId("foo", 1), idxSettings);
env.deleteShardDirectorySafe(new ShardId("foo", 0), idxSettings);
fail("shard is locked");
} catch (LockObtainFailedException ex) {
// expected
}
for (Path path : env.indexPaths(new Index("foo"))) {
assertTrue(Files.exists(path.resolve("0")));
assertTrue(Files.exists(path.resolve("1")));
assertTrue(Files.exists(path.resolve("2")));
}
env.deleteShardDirectorySafe(new ShardId("foo", 2), idxSettings);
env.deleteShardDirectorySafe(new ShardId("foo", 1), idxSettings);
for (Path path : env.indexPaths(new Index("foo"))) {
assertTrue(Files.exists(path.resolve("1")));
assertFalse(Files.exists(path.resolve("2")));
assertTrue(Files.exists(path.resolve("0")));
assertFalse(Files.exists(path.resolve("1")));
}
try {
@ -209,7 +210,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
@Override
protected void doRun() throws Exception {
start.await();
try (ShardLock _ = env.shardLock(new ShardId("foo", 1))) {
try (ShardLock _ = env.shardLock(new ShardId("foo", 0))) {
blockLatch.countDown();
Thread.sleep(randomIntBetween(1, 10));
}

View File

@ -134,7 +134,7 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest {
assertTrue(test.hasShard(0));
Path[] paths = nodeEnc.shardDataPaths(new ShardId(test.index(), 0), test.getIndexSettings());
try {
indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS));
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
fail("can't get lock");
} catch (LockObtainFailedException ex) {
@ -149,7 +149,7 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest {
}
assertEquals(indicesService.numPendingDeletes(test.index()), 1);
// shard lock released... we can now delete
indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS));
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
assertEquals(indicesService.numPendingDeletes(test.index()), 0);
for (Path p : paths) {
assertFalse(Files.exists(p));
@ -161,7 +161,7 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest {
indicesService.addPendingDelete(new Index("bogus"), new ShardId("bogus", 1), test.getIndexSettings());
assertEquals(indicesService.numPendingDeletes(test.index()), 2);
// shard lock released... we can now delete
indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS));
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
assertEquals(indicesService.numPendingDeletes(test.index()), 0);
}
assertAcked(client().admin().indices().prepareOpen("test"));