diff --git a/core/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/core/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 59ef122760b..4ddf0e38b75 100644 --- a/core/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/core/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -433,7 +433,7 @@ public final class NodeEnvironment implements Closeable { * @param shardId the id of the shard to delete to delete * @throws IOException if an IOException occurs */ - public void deleteShardDirectorySafe(ShardId shardId, IndexSettings indexSettings) throws IOException { + public void deleteShardDirectorySafe(ShardId shardId, IndexSettings indexSettings) throws IOException, ShardLockObtainFailedException { final Path[] paths = availableShardPaths(shardId); logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths); try (ShardLock lock = shardLock(shardId)) { @@ -462,7 +462,7 @@ public final class NodeEnvironment implements Closeable { locks[i] = dirs[i].obtainLock(IndexWriter.WRITE_LOCK_NAME); } catch (IOException ex) { throw new LockObtainFailedException("unable to acquire " + - IndexWriter.WRITE_LOCK_NAME + " for " + p); + IndexWriter.WRITE_LOCK_NAME + " for " + p, ex); } } } finally { @@ -504,7 +504,7 @@ public final class NodeEnvironment implements Closeable { try { shardLock(id, 0).close(); return false; - } catch (IOException ex) { + } catch (ShardLockObtainFailedException ex) { return true; } } @@ -519,7 +519,8 @@ public final class NodeEnvironment implements Closeable { * @param indexSettings settings for the index being deleted * @throws IOException if any of the shards data directories can't be locked or deleted */ - public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSettings indexSettings) throws IOException { + public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSettings indexSettings) + throws IOException, ShardLockObtainFailedException { final List locks = lockAllForIndex(index, indexSettings, lockTimeoutMS); try { deleteIndexDirectoryUnderLock(index, indexSettings); @@ -549,14 +550,15 @@ public final class NodeEnvironment implements Closeable { /** * Tries to lock all local shards for the given index. If any of the shard locks can't be acquired - * an {@link LockObtainFailedException} is thrown and all previously acquired locks are released. + * a {@link ShardLockObtainFailedException} is thrown and all previously acquired locks are released. * * @param index the index to lock shards for * @param lockTimeoutMS how long to wait for acquiring the indices shard locks * @return the {@link ShardLock} instances for this index. * @throws IOException if an IOException occurs. */ - public List lockAllForIndex(Index index, IndexSettings settings, long lockTimeoutMS) throws IOException { + public List lockAllForIndex(Index index, IndexSettings settings, long lockTimeoutMS) + throws IOException, ShardLockObtainFailedException { final int numShards = settings.getNumberOfShards(); if (numShards <= 0) { throw new IllegalArgumentException("settings must contain a non-null > 0 number of shards"); @@ -584,15 +586,14 @@ public final class NodeEnvironment implements Closeable { * Tries to lock the given shards ID. A shard lock is required to perform any kind of * write operation on a shards data directory like deleting files, creating a new index writer * or recover from a different shard instance into it. If the shard lock can not be acquired - * an {@link LockObtainFailedException} is thrown. + * a {@link ShardLockObtainFailedException} is thrown. * * Note: this method will return immediately if the lock can't be acquired. * * @param id the shard ID to lock * @return the shard lock. Call {@link ShardLock#close()} to release the lock - * @throws IOException if an IOException occurs. */ - public ShardLock shardLock(ShardId id) throws IOException { + public ShardLock shardLock(ShardId id) throws ShardLockObtainFailedException { return shardLock(id, 0); } @@ -600,13 +601,12 @@ public final class NodeEnvironment implements Closeable { * Tries to lock the given shards ID. A shard lock is required to perform any kind of * write operation on a shards data directory like deleting files, creating a new index writer * or recover from a different shard instance into it. If the shard lock can not be acquired - * an {@link org.apache.lucene.store.LockObtainFailedException} is thrown + * a {@link ShardLockObtainFailedException} is thrown * @param shardId the shard ID to lock * @param lockTimeoutMS the lock timeout in milliseconds * @return the shard lock. Call {@link ShardLock#close()} to release the lock - * @throws IOException if an IOException occurs. */ - public ShardLock shardLock(final ShardId shardId, long lockTimeoutMS) throws IOException { + public ShardLock shardLock(final ShardId shardId, long lockTimeoutMS) throws ShardLockObtainFailedException { logger.trace("acquiring node shardlock on [{}], timeout [{}]", shardId, lockTimeoutMS); final InternalShardLock shardLock; final boolean acquired; @@ -647,8 +647,7 @@ public final class NodeEnvironment implements Closeable { */ @FunctionalInterface public interface ShardLocker { - - ShardLock lock(ShardId shardId, long lockTimeoutMS) throws IOException; + ShardLock lock(ShardId shardId, long lockTimeoutMS) throws ShardLockObtainFailedException; } /** @@ -703,14 +702,15 @@ public final class NodeEnvironment implements Closeable { } } - void acquire(long timeoutInMillis) throws LockObtainFailedException{ + void acquire(long timeoutInMillis) throws ShardLockObtainFailedException { try { if (mutex.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS) == false) { - throw new LockObtainFailedException("Can't lock shard " + shardId + ", timed out after " + timeoutInMillis + "ms"); + throw new ShardLockObtainFailedException(shardId, + "obtaining shard lock timed out after " + timeoutInMillis + "ms"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new LockObtainFailedException("Can't lock shard " + shardId + ", interrupted", e); + throw new ShardLockObtainFailedException(shardId, "thread interrupted while trying to obtain shard lock", e); } } } diff --git a/core/src/main/java/org/elasticsearch/env/ShardLock.java b/core/src/main/java/org/elasticsearch/env/ShardLock.java index 4ff1237ba20..99dd973b4ce 100644 --- a/core/src/main/java/org/elasticsearch/env/ShardLock.java +++ b/core/src/main/java/org/elasticsearch/env/ShardLock.java @@ -49,7 +49,7 @@ public abstract class ShardLock implements Closeable { } @Override - public final void close() throws IOException { + public final void close() { if (this.closed.compareAndSet(false, true)) { closeInternal(); } diff --git a/core/src/main/java/org/elasticsearch/env/ShardLockObtainFailedException.java b/core/src/main/java/org/elasticsearch/env/ShardLockObtainFailedException.java new file mode 100644 index 00000000000..1c113e1abea --- /dev/null +++ b/core/src/main/java/org/elasticsearch/env/ShardLockObtainFailedException.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.env; + +import org.elasticsearch.index.shard.ShardId; + +/** + * Exception used when the in-memory lock for a shard cannot be obtained + */ +public class ShardLockObtainFailedException extends Exception { + private final ShardId shardId; + + public ShardLockObtainFailedException(ShardId shardId, String message) { + super(message); + this.shardId = shardId; + } + + public ShardLockObtainFailedException(ShardId shardId, String message, Throwable cause) { + super(message, cause); + this.shardId = shardId; + } + + @Override + public String getMessage() { + StringBuffer sb = new StringBuffer(); + sb.append(shardId.toString()); + sb.append(": "); + sb.append(super.getMessage()); + return sb.toString(); + } +} diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 0ba37418792..b9f93bf2ac4 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; +import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.IndexCache; @@ -279,8 +280,9 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust boolean success = false; Store store = null; IndexShard indexShard = null; - final ShardLock lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5)); + ShardLock lock = null; try { + lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5)); eventListener.beforeIndexShardCreated(shardId, indexSettings); ShardPath path; try { @@ -349,9 +351,13 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); success = true; return indexShard; + } catch (ShardLockObtainFailedException e) { + throw new IOException("failed to obtain in-memory shard lock", e); } finally { if (success == false) { - IOUtils.closeWhileHandlingException(lock); + if (lock != null) { + IOUtils.closeWhileHandlingException(lock); + } closeShard("initialization failed", shardId, indexShard, store, eventListener); } } diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 659b230edab..e714d8db8b6 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -70,6 +70,7 @@ import org.elasticsearch.common.util.concurrent.RefCounted; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; +import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.AbstractIndexShardComponent; @@ -388,6 +389,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref // that's fine - happens all the time no need to log } catch (FileNotFoundException | NoSuchFileException ex) { logger.info("Failed to open / find files while reading metadata snapshot"); + } catch (ShardLockObtainFailedException ex) { + logger.info("{}: failed to obtain shard lock", ex, shardId); } return MetadataSnapshot.EMPTY; } @@ -418,6 +421,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref failIfCorrupted(dir, shardId); SegmentInfos segInfo = Lucene.readSegmentInfos(dir); logger.trace("{} loaded segment info [{}]", shardId, segInfo); + } catch (ShardLockObtainFailedException ex) { + logger.error("{} unable to acquire shard lock", ex, shardId); + throw new IOException(ex); } } diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index d5a0da1be4b..7519494c39c 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -62,6 +62,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; +import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.gateway.MetaDataStateFormat; import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.index.Index; @@ -676,7 +677,8 @@ public class IndicesService extends AbstractLifecycleComponent * @param clusterState . This is required to access the indexes settings etc. * @throws IOException if an IOException occurs */ - public void deleteShardStore(String reason, ShardId shardId, ClusterState clusterState) throws IOException { + public void deleteShardStore(String reason, ShardId shardId, ClusterState clusterState) + throws IOException, ShardLockObtainFailedException { final IndexMetaData metaData = clusterState.getMetaData().indices().get(shardId.getIndexName()); final IndexSettings indexSettings = buildIndexSettings(metaData); @@ -891,7 +893,8 @@ public class IndicesService extends AbstractLifecycleComponent * @param timeout the timeout used for processing pending deletes */ @Override - public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeout) throws IOException, InterruptedException { + public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeout) + throws IOException, InterruptedException, ShardLockObtainFailedException { logger.debug("{} processing pending deletes", index); final long startTimeNS = System.nanoTime(); final List shardLocks = nodeEnv.lockAllForIndex(index, indexSettings, timeout.millis()); diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index f01a09d4608..fd77722f86b 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexComponent; @@ -835,6 +836,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple return null; } - void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeValue) throws IOException, InterruptedException; + void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeValue) + throws IOException, InterruptedException, ShardLockObtainFailedException; } } diff --git a/core/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java b/core/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java index ee403bfe910..e5c1c53dad7 100644 --- a/core/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java +++ b/core/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.env; import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -129,7 +128,7 @@ public class NodeEnvironmentTests extends ESTestCase { IOUtils.close(first, second); } - public void testShardLock() throws IOException { + public void testShardLock() throws Exception { final NodeEnvironment env = newNodeEnvironment(); Index index = new Index("foo", "fooUUID"); @@ -139,7 +138,7 @@ public class NodeEnvironmentTests extends ESTestCase { try { env.shardLock(new ShardId(index, 0)); fail("shard is locked"); - } catch (LockObtainFailedException ex) { + } catch (ShardLockObtainFailedException ex) { // expected } for (Path path : env.indexPaths(index)) { @@ -149,7 +148,7 @@ public class NodeEnvironmentTests extends ESTestCase { try { env.lockAllForIndex(index, idxSettings, randomIntBetween(0, 10)); fail("shard 0 is locked"); - } catch (LockObtainFailedException ex) { + } catch (ShardLockObtainFailedException ex) { // expected } @@ -161,7 +160,7 @@ public class NodeEnvironmentTests extends ESTestCase { try { env.shardLock(new ShardId(index, 0)); fail("shard is locked"); - } catch (LockObtainFailedException ex) { + } catch (ShardLockObtainFailedException ex) { // expected } IOUtils.close(locks); @@ -213,13 +212,12 @@ public class NodeEnvironmentTests extends ESTestCase { env.close(); } - public void testDeleteSafe() throws IOException, InterruptedException { + public void testDeleteSafe() throws Exception { final NodeEnvironment env = newNodeEnvironment(); final Index index = new Index("foo", "fooUUID"); ShardLock fooLock = env.shardLock(new ShardId(index, 0)); assertEquals(new ShardId(index, 0), fooLock.getShardId()); - for (Path path : env.indexPaths(index)) { Files.createDirectories(path.resolve("0")); Files.createDirectories(path.resolve("1")); @@ -228,14 +226,13 @@ public class NodeEnvironmentTests extends ESTestCase { try { env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings); fail("shard is locked"); - } catch (LockObtainFailedException ex) { + } catch (ShardLockObtainFailedException ex) { // expected } for (Path path : env.indexPaths(index)) { assertTrue(Files.exists(path.resolve("0"))); assertTrue(Files.exists(path.resolve("1"))); - } env.deleteShardDirectorySafe(new ShardId(index, 1), idxSettings); @@ -248,7 +245,7 @@ public class NodeEnvironmentTests extends ESTestCase { try { env.deleteIndexDirectorySafe(index, randomIntBetween(0, 10), idxSettings); fail("shard is locked"); - } catch (LockObtainFailedException ex) { + } catch (ShardLockObtainFailedException ex) { // expected } fooLock.close(); @@ -338,10 +335,8 @@ public class NodeEnvironmentTests extends ESTestCase { assertEquals(flipFlop[shard].incrementAndGet(), 1); assertEquals(flipFlop[shard].decrementAndGet(), 0); } - } catch (LockObtainFailedException ex) { + } catch (ShardLockObtainFailedException ex) { // ok - } catch (IOException ex) { - fail(ex.toString()); } } } diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java b/core/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java index 27bb2cccff4..216ddf76f6e 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.gateway.GatewayMetaState; import org.elasticsearch.gateway.LocalAllocateDangledIndices; import org.elasticsearch.gateway.MetaStateService; @@ -172,7 +173,7 @@ public class IndicesServiceTests extends ESSingleNodeTestCase { try { indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS)); fail("can't get lock"); - } catch (LockObtainFailedException ex) { + } catch (ShardLockObtainFailedException ex) { } assertTrue(path.exists()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index cc78afb987f..6fc612a60c9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -67,6 +67,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; @@ -1912,7 +1913,7 @@ public final class InternalTestCluster extends TestCluster { for (ShardId id : shardIds) { try { env.shardLock(id, TimeUnit.SECONDS.toMillis(5)).close(); - } catch (IOException ex) { + } catch (ShardLockObtainFailedException ex) { fail("Shard " + id + " is still locked after 5 sec waiting"); } }