Switching LockObtainFailedException over to ShardLockObtainFailedException
`LobObtainFailedException` should be reserved for on-disk locks that Lucene attempts (like `write.lock`). This switches our in-memory semaphore locks for shards to use a different exception. Additionally, ShardLockObtainFailedException no longer subclasses IOException, since no IO is being done is this case. Resolves #19978
This commit is contained in:
parent
d94e388904
commit
1de3388fa3
|
@ -433,7 +433,7 @@ public final class NodeEnvironment implements Closeable {
|
||||||
* @param shardId the id of the shard to delete to delete
|
* @param shardId the id of the shard to delete to delete
|
||||||
* @throws IOException if an IOException occurs
|
* @throws IOException if an IOException occurs
|
||||||
*/
|
*/
|
||||||
public void deleteShardDirectorySafe(ShardId shardId, IndexSettings indexSettings) throws IOException {
|
public void deleteShardDirectorySafe(ShardId shardId, IndexSettings indexSettings) throws IOException, ShardLockObtainFailedException {
|
||||||
final Path[] paths = availableShardPaths(shardId);
|
final Path[] paths = availableShardPaths(shardId);
|
||||||
logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths);
|
logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths);
|
||||||
try (ShardLock lock = shardLock(shardId)) {
|
try (ShardLock lock = shardLock(shardId)) {
|
||||||
|
@ -462,7 +462,7 @@ public final class NodeEnvironment implements Closeable {
|
||||||
locks[i] = dirs[i].obtainLock(IndexWriter.WRITE_LOCK_NAME);
|
locks[i] = dirs[i].obtainLock(IndexWriter.WRITE_LOCK_NAME);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
throw new LockObtainFailedException("unable to acquire " +
|
throw new LockObtainFailedException("unable to acquire " +
|
||||||
IndexWriter.WRITE_LOCK_NAME + " for " + p);
|
IndexWriter.WRITE_LOCK_NAME + " for " + p, ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -504,7 +504,7 @@ public final class NodeEnvironment implements Closeable {
|
||||||
try {
|
try {
|
||||||
shardLock(id, 0).close();
|
shardLock(id, 0).close();
|
||||||
return false;
|
return false;
|
||||||
} catch (IOException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -519,7 +519,8 @@ public final class NodeEnvironment implements Closeable {
|
||||||
* @param indexSettings settings for the index being deleted
|
* @param indexSettings settings for the index being deleted
|
||||||
* @throws IOException if any of the shards data directories can't be locked or deleted
|
* @throws IOException if any of the shards data directories can't be locked or deleted
|
||||||
*/
|
*/
|
||||||
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSettings indexSettings) throws IOException {
|
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSettings indexSettings)
|
||||||
|
throws IOException, ShardLockObtainFailedException {
|
||||||
final List<ShardLock> locks = lockAllForIndex(index, indexSettings, lockTimeoutMS);
|
final List<ShardLock> locks = lockAllForIndex(index, indexSettings, lockTimeoutMS);
|
||||||
try {
|
try {
|
||||||
deleteIndexDirectoryUnderLock(index, indexSettings);
|
deleteIndexDirectoryUnderLock(index, indexSettings);
|
||||||
|
@ -549,14 +550,15 @@ public final class NodeEnvironment implements Closeable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tries to lock all local shards for the given index. If any of the shard locks can't be acquired
|
* Tries to lock all local shards for the given index. If any of the shard locks can't be acquired
|
||||||
* an {@link LockObtainFailedException} is thrown and all previously acquired locks are released.
|
* a {@link ShardLockObtainFailedException} is thrown and all previously acquired locks are released.
|
||||||
*
|
*
|
||||||
* @param index the index to lock shards for
|
* @param index the index to lock shards for
|
||||||
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
|
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
|
||||||
* @return the {@link ShardLock} instances for this index.
|
* @return the {@link ShardLock} instances for this index.
|
||||||
* @throws IOException if an IOException occurs.
|
* @throws IOException if an IOException occurs.
|
||||||
*/
|
*/
|
||||||
public List<ShardLock> lockAllForIndex(Index index, IndexSettings settings, long lockTimeoutMS) throws IOException {
|
public List<ShardLock> lockAllForIndex(Index index, IndexSettings settings, long lockTimeoutMS)
|
||||||
|
throws IOException, ShardLockObtainFailedException {
|
||||||
final int numShards = settings.getNumberOfShards();
|
final int numShards = settings.getNumberOfShards();
|
||||||
if (numShards <= 0) {
|
if (numShards <= 0) {
|
||||||
throw new IllegalArgumentException("settings must contain a non-null > 0 number of shards");
|
throw new IllegalArgumentException("settings must contain a non-null > 0 number of shards");
|
||||||
|
@ -584,15 +586,14 @@ public final class NodeEnvironment implements Closeable {
|
||||||
* Tries to lock the given shards ID. A shard lock is required to perform any kind of
|
* Tries to lock the given shards ID. A shard lock is required to perform any kind of
|
||||||
* write operation on a shards data directory like deleting files, creating a new index writer
|
* write operation on a shards data directory like deleting files, creating a new index writer
|
||||||
* or recover from a different shard instance into it. If the shard lock can not be acquired
|
* or recover from a different shard instance into it. If the shard lock can not be acquired
|
||||||
* an {@link LockObtainFailedException} is thrown.
|
* a {@link ShardLockObtainFailedException} is thrown.
|
||||||
*
|
*
|
||||||
* Note: this method will return immediately if the lock can't be acquired.
|
* Note: this method will return immediately if the lock can't be acquired.
|
||||||
*
|
*
|
||||||
* @param id the shard ID to lock
|
* @param id the shard ID to lock
|
||||||
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
|
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
|
||||||
* @throws IOException if an IOException occurs.
|
|
||||||
*/
|
*/
|
||||||
public ShardLock shardLock(ShardId id) throws IOException {
|
public ShardLock shardLock(ShardId id) throws ShardLockObtainFailedException {
|
||||||
return shardLock(id, 0);
|
return shardLock(id, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -600,13 +601,12 @@ public final class NodeEnvironment implements Closeable {
|
||||||
* Tries to lock the given shards ID. A shard lock is required to perform any kind of
|
* Tries to lock the given shards ID. A shard lock is required to perform any kind of
|
||||||
* write operation on a shards data directory like deleting files, creating a new index writer
|
* write operation on a shards data directory like deleting files, creating a new index writer
|
||||||
* or recover from a different shard instance into it. If the shard lock can not be acquired
|
* or recover from a different shard instance into it. If the shard lock can not be acquired
|
||||||
* an {@link org.apache.lucene.store.LockObtainFailedException} is thrown
|
* a {@link ShardLockObtainFailedException} is thrown
|
||||||
* @param shardId the shard ID to lock
|
* @param shardId the shard ID to lock
|
||||||
* @param lockTimeoutMS the lock timeout in milliseconds
|
* @param lockTimeoutMS the lock timeout in milliseconds
|
||||||
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
|
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
|
||||||
* @throws IOException if an IOException occurs.
|
|
||||||
*/
|
*/
|
||||||
public ShardLock shardLock(final ShardId shardId, long lockTimeoutMS) throws IOException {
|
public ShardLock shardLock(final ShardId shardId, long lockTimeoutMS) throws ShardLockObtainFailedException {
|
||||||
logger.trace("acquiring node shardlock on [{}], timeout [{}]", shardId, lockTimeoutMS);
|
logger.trace("acquiring node shardlock on [{}], timeout [{}]", shardId, lockTimeoutMS);
|
||||||
final InternalShardLock shardLock;
|
final InternalShardLock shardLock;
|
||||||
final boolean acquired;
|
final boolean acquired;
|
||||||
|
@ -647,8 +647,7 @@ public final class NodeEnvironment implements Closeable {
|
||||||
*/
|
*/
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
public interface ShardLocker {
|
public interface ShardLocker {
|
||||||
|
ShardLock lock(ShardId shardId, long lockTimeoutMS) throws ShardLockObtainFailedException;
|
||||||
ShardLock lock(ShardId shardId, long lockTimeoutMS) throws IOException;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -703,14 +702,15 @@ public final class NodeEnvironment implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void acquire(long timeoutInMillis) throws LockObtainFailedException{
|
void acquire(long timeoutInMillis) throws ShardLockObtainFailedException {
|
||||||
try {
|
try {
|
||||||
if (mutex.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS) == false) {
|
if (mutex.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS) == false) {
|
||||||
throw new LockObtainFailedException("Can't lock shard " + shardId + ", timed out after " + timeoutInMillis + "ms");
|
throw new ShardLockObtainFailedException(shardId,
|
||||||
|
"obtaining shard lock timed out after " + timeoutInMillis + "ms");
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
throw new LockObtainFailedException("Can't lock shard " + shardId + ", interrupted", e);
|
throw new ShardLockObtainFailedException(shardId, "thread interrupted while trying to obtain shard lock", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,7 @@ public abstract class ShardLock implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void close() throws IOException {
|
public final void close() {
|
||||||
if (this.closed.compareAndSet(false, true)) {
|
if (this.closed.compareAndSet(false, true)) {
|
||||||
closeInternal();
|
closeInternal();
|
||||||
}
|
}
|
||||||
|
|
48
core/src/main/java/org/elasticsearch/env/ShardLockObtainFailedException.java
vendored
Normal file
48
core/src/main/java/org/elasticsearch/env/ShardLockObtainFailedException.java
vendored
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.env;
|
||||||
|
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exception used when the in-memory lock for a shard cannot be obtained
|
||||||
|
*/
|
||||||
|
public class ShardLockObtainFailedException extends Exception {
|
||||||
|
private final ShardId shardId;
|
||||||
|
|
||||||
|
public ShardLockObtainFailedException(ShardId shardId, String message) {
|
||||||
|
super(message);
|
||||||
|
this.shardId = shardId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ShardLockObtainFailedException(ShardId shardId, String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
this.shardId = shardId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getMessage() {
|
||||||
|
StringBuffer sb = new StringBuffer();
|
||||||
|
sb.append(shardId.toString());
|
||||||
|
sb.append(": ");
|
||||||
|
sb.append(super.getMessage());
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
|
@ -39,6 +39,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.env.ShardLock;
|
import org.elasticsearch.env.ShardLock;
|
||||||
|
import org.elasticsearch.env.ShardLockObtainFailedException;
|
||||||
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||||
import org.elasticsearch.index.analysis.AnalysisService;
|
import org.elasticsearch.index.analysis.AnalysisService;
|
||||||
import org.elasticsearch.index.cache.IndexCache;
|
import org.elasticsearch.index.cache.IndexCache;
|
||||||
|
@ -279,8 +280,9 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
Store store = null;
|
Store store = null;
|
||||||
IndexShard indexShard = null;
|
IndexShard indexShard = null;
|
||||||
final ShardLock lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
|
ShardLock lock = null;
|
||||||
try {
|
try {
|
||||||
|
lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
|
||||||
eventListener.beforeIndexShardCreated(shardId, indexSettings);
|
eventListener.beforeIndexShardCreated(shardId, indexSettings);
|
||||||
ShardPath path;
|
ShardPath path;
|
||||||
try {
|
try {
|
||||||
|
@ -349,9 +351,13 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
||||||
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
|
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
|
||||||
success = true;
|
success = true;
|
||||||
return indexShard;
|
return indexShard;
|
||||||
|
} catch (ShardLockObtainFailedException e) {
|
||||||
|
throw new IOException("failed to obtain in-memory shard lock", e);
|
||||||
} finally {
|
} finally {
|
||||||
if (success == false) {
|
if (success == false) {
|
||||||
|
if (lock != null) {
|
||||||
IOUtils.closeWhileHandlingException(lock);
|
IOUtils.closeWhileHandlingException(lock);
|
||||||
|
}
|
||||||
closeShard("initialization failed", shardId, indexShard, store, eventListener);
|
closeShard("initialization failed", shardId, indexShard, store, eventListener);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,6 +70,7 @@ import org.elasticsearch.common.util.concurrent.RefCounted;
|
||||||
import org.elasticsearch.common.util.iterable.Iterables;
|
import org.elasticsearch.common.util.iterable.Iterables;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.env.ShardLock;
|
import org.elasticsearch.env.ShardLock;
|
||||||
|
import org.elasticsearch.env.ShardLockObtainFailedException;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||||
|
@ -388,6 +389,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||||
// that's fine - happens all the time no need to log
|
// that's fine - happens all the time no need to log
|
||||||
} catch (FileNotFoundException | NoSuchFileException ex) {
|
} catch (FileNotFoundException | NoSuchFileException ex) {
|
||||||
logger.info("Failed to open / find files while reading metadata snapshot");
|
logger.info("Failed to open / find files while reading metadata snapshot");
|
||||||
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
|
logger.info("{}: failed to obtain shard lock", ex, shardId);
|
||||||
}
|
}
|
||||||
return MetadataSnapshot.EMPTY;
|
return MetadataSnapshot.EMPTY;
|
||||||
}
|
}
|
||||||
|
@ -418,6 +421,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||||
failIfCorrupted(dir, shardId);
|
failIfCorrupted(dir, shardId);
|
||||||
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
|
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
|
||||||
logger.trace("{} loaded segment info [{}]", shardId, segInfo);
|
logger.trace("{} loaded segment info [{}]", shardId, segInfo);
|
||||||
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
|
logger.error("{} unable to acquire shard lock", ex, shardId);
|
||||||
|
throw new IOException(ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,6 +62,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.common.util.iterable.Iterables;
|
import org.elasticsearch.common.util.iterable.Iterables;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.env.ShardLock;
|
import org.elasticsearch.env.ShardLock;
|
||||||
|
import org.elasticsearch.env.ShardLockObtainFailedException;
|
||||||
import org.elasticsearch.gateway.MetaDataStateFormat;
|
import org.elasticsearch.gateway.MetaDataStateFormat;
|
||||||
import org.elasticsearch.gateway.MetaStateService;
|
import org.elasticsearch.gateway.MetaStateService;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
|
@ -676,7 +677,8 @@ public class IndicesService extends AbstractLifecycleComponent
|
||||||
* @param clusterState . This is required to access the indexes settings etc.
|
* @param clusterState . This is required to access the indexes settings etc.
|
||||||
* @throws IOException if an IOException occurs
|
* @throws IOException if an IOException occurs
|
||||||
*/
|
*/
|
||||||
public void deleteShardStore(String reason, ShardId shardId, ClusterState clusterState) throws IOException {
|
public void deleteShardStore(String reason, ShardId shardId, ClusterState clusterState)
|
||||||
|
throws IOException, ShardLockObtainFailedException {
|
||||||
final IndexMetaData metaData = clusterState.getMetaData().indices().get(shardId.getIndexName());
|
final IndexMetaData metaData = clusterState.getMetaData().indices().get(shardId.getIndexName());
|
||||||
|
|
||||||
final IndexSettings indexSettings = buildIndexSettings(metaData);
|
final IndexSettings indexSettings = buildIndexSettings(metaData);
|
||||||
|
@ -891,7 +893,8 @@ public class IndicesService extends AbstractLifecycleComponent
|
||||||
* @param timeout the timeout used for processing pending deletes
|
* @param timeout the timeout used for processing pending deletes
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeout) throws IOException, InterruptedException {
|
public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeout)
|
||||||
|
throws IOException, InterruptedException, ShardLockObtainFailedException {
|
||||||
logger.debug("{} processing pending deletes", index);
|
logger.debug("{} processing pending deletes", index);
|
||||||
final long startTimeNS = System.nanoTime();
|
final long startTimeNS = System.nanoTime();
|
||||||
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, indexSettings, timeout.millis());
|
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, indexSettings, timeout.millis());
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.Callback;
|
import org.elasticsearch.common.util.Callback;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
|
import org.elasticsearch.env.ShardLockObtainFailedException;
|
||||||
import org.elasticsearch.gateway.GatewayService;
|
import org.elasticsearch.gateway.GatewayService;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.IndexComponent;
|
import org.elasticsearch.index.IndexComponent;
|
||||||
|
@ -835,6 +836,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeValue) throws IOException, InterruptedException;
|
void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeValue)
|
||||||
|
throws IOException, InterruptedException, ShardLockObtainFailedException;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.elasticsearch.env;
|
package org.elasticsearch.env;
|
||||||
|
|
||||||
import org.apache.lucene.index.SegmentInfos;
|
import org.apache.lucene.index.SegmentInfos;
|
||||||
import org.apache.lucene.store.LockObtainFailedException;
|
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
@ -129,7 +128,7 @@ public class NodeEnvironmentTests extends ESTestCase {
|
||||||
IOUtils.close(first, second);
|
IOUtils.close(first, second);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testShardLock() throws IOException {
|
public void testShardLock() throws Exception {
|
||||||
final NodeEnvironment env = newNodeEnvironment();
|
final NodeEnvironment env = newNodeEnvironment();
|
||||||
|
|
||||||
Index index = new Index("foo", "fooUUID");
|
Index index = new Index("foo", "fooUUID");
|
||||||
|
@ -139,7 +138,7 @@ public class NodeEnvironmentTests extends ESTestCase {
|
||||||
try {
|
try {
|
||||||
env.shardLock(new ShardId(index, 0));
|
env.shardLock(new ShardId(index, 0));
|
||||||
fail("shard is locked");
|
fail("shard is locked");
|
||||||
} catch (LockObtainFailedException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
// expected
|
// expected
|
||||||
}
|
}
|
||||||
for (Path path : env.indexPaths(index)) {
|
for (Path path : env.indexPaths(index)) {
|
||||||
|
@ -149,7 +148,7 @@ public class NodeEnvironmentTests extends ESTestCase {
|
||||||
try {
|
try {
|
||||||
env.lockAllForIndex(index, idxSettings, randomIntBetween(0, 10));
|
env.lockAllForIndex(index, idxSettings, randomIntBetween(0, 10));
|
||||||
fail("shard 0 is locked");
|
fail("shard 0 is locked");
|
||||||
} catch (LockObtainFailedException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
// expected
|
// expected
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,7 +160,7 @@ public class NodeEnvironmentTests extends ESTestCase {
|
||||||
try {
|
try {
|
||||||
env.shardLock(new ShardId(index, 0));
|
env.shardLock(new ShardId(index, 0));
|
||||||
fail("shard is locked");
|
fail("shard is locked");
|
||||||
} catch (LockObtainFailedException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
// expected
|
// expected
|
||||||
}
|
}
|
||||||
IOUtils.close(locks);
|
IOUtils.close(locks);
|
||||||
|
@ -213,13 +212,12 @@ public class NodeEnvironmentTests extends ESTestCase {
|
||||||
env.close();
|
env.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDeleteSafe() throws IOException, InterruptedException {
|
public void testDeleteSafe() throws Exception {
|
||||||
final NodeEnvironment env = newNodeEnvironment();
|
final NodeEnvironment env = newNodeEnvironment();
|
||||||
final Index index = new Index("foo", "fooUUID");
|
final Index index = new Index("foo", "fooUUID");
|
||||||
ShardLock fooLock = env.shardLock(new ShardId(index, 0));
|
ShardLock fooLock = env.shardLock(new ShardId(index, 0));
|
||||||
assertEquals(new ShardId(index, 0), fooLock.getShardId());
|
assertEquals(new ShardId(index, 0), fooLock.getShardId());
|
||||||
|
|
||||||
|
|
||||||
for (Path path : env.indexPaths(index)) {
|
for (Path path : env.indexPaths(index)) {
|
||||||
Files.createDirectories(path.resolve("0"));
|
Files.createDirectories(path.resolve("0"));
|
||||||
Files.createDirectories(path.resolve("1"));
|
Files.createDirectories(path.resolve("1"));
|
||||||
|
@ -228,14 +226,13 @@ public class NodeEnvironmentTests extends ESTestCase {
|
||||||
try {
|
try {
|
||||||
env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings);
|
env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings);
|
||||||
fail("shard is locked");
|
fail("shard is locked");
|
||||||
} catch (LockObtainFailedException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
// expected
|
// expected
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Path path : env.indexPaths(index)) {
|
for (Path path : env.indexPaths(index)) {
|
||||||
assertTrue(Files.exists(path.resolve("0")));
|
assertTrue(Files.exists(path.resolve("0")));
|
||||||
assertTrue(Files.exists(path.resolve("1")));
|
assertTrue(Files.exists(path.resolve("1")));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
env.deleteShardDirectorySafe(new ShardId(index, 1), idxSettings);
|
env.deleteShardDirectorySafe(new ShardId(index, 1), idxSettings);
|
||||||
|
@ -248,7 +245,7 @@ public class NodeEnvironmentTests extends ESTestCase {
|
||||||
try {
|
try {
|
||||||
env.deleteIndexDirectorySafe(index, randomIntBetween(0, 10), idxSettings);
|
env.deleteIndexDirectorySafe(index, randomIntBetween(0, 10), idxSettings);
|
||||||
fail("shard is locked");
|
fail("shard is locked");
|
||||||
} catch (LockObtainFailedException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
// expected
|
// expected
|
||||||
}
|
}
|
||||||
fooLock.close();
|
fooLock.close();
|
||||||
|
@ -338,10 +335,8 @@ public class NodeEnvironmentTests extends ESTestCase {
|
||||||
assertEquals(flipFlop[shard].incrementAndGet(), 1);
|
assertEquals(flipFlop[shard].incrementAndGet(), 1);
|
||||||
assertEquals(flipFlop[shard].decrementAndGet(), 0);
|
assertEquals(flipFlop[shard].decrementAndGet(), 0);
|
||||||
}
|
}
|
||||||
} catch (LockObtainFailedException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
// ok
|
// ok
|
||||||
} catch (IOException ex) {
|
|
||||||
fail(ex.toString());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
|
import org.elasticsearch.env.ShardLockObtainFailedException;
|
||||||
import org.elasticsearch.gateway.GatewayMetaState;
|
import org.elasticsearch.gateway.GatewayMetaState;
|
||||||
import org.elasticsearch.gateway.LocalAllocateDangledIndices;
|
import org.elasticsearch.gateway.LocalAllocateDangledIndices;
|
||||||
import org.elasticsearch.gateway.MetaStateService;
|
import org.elasticsearch.gateway.MetaStateService;
|
||||||
|
@ -172,7 +173,7 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
||||||
try {
|
try {
|
||||||
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
|
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
|
||||||
fail("can't get lock");
|
fail("can't get lock");
|
||||||
} catch (LockObtainFailedException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
|
|
||||||
}
|
}
|
||||||
assertTrue(path.exists());
|
assertTrue(path.exists());
|
||||||
|
|
|
@ -67,6 +67,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.discovery.DiscoverySettings;
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
|
import org.elasticsearch.env.ShardLockObtainFailedException;
|
||||||
import org.elasticsearch.http.HttpServerTransport;
|
import org.elasticsearch.http.HttpServerTransport;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.IndexService;
|
import org.elasticsearch.index.IndexService;
|
||||||
|
@ -1912,7 +1913,7 @@ public final class InternalTestCluster extends TestCluster {
|
||||||
for (ShardId id : shardIds) {
|
for (ShardId id : shardIds) {
|
||||||
try {
|
try {
|
||||||
env.shardLock(id, TimeUnit.SECONDS.toMillis(5)).close();
|
env.shardLock(id, TimeUnit.SECONDS.toMillis(5)).close();
|
||||||
} catch (IOException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
fail("Shard " + id + " is still locked after 5 sec waiting");
|
fail("Shard " + id + " is still locked after 5 sec waiting");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue