mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Add details about what acquired the shard lock last (#38807)
This adds a `details` parameter to shard locking in `NodeEnvironment`. This is intended to be used for diagnosing issues such as ``` 1> [2019-02-11T14:34:19,262][INFO ][o.e.c.m.MetaDataDeleteIndexService] [node_s0] [.tasks/oSYOG0-9SHOx_pfAoiSExQ] deleting index 1> [2019-02-11T14:34:19,279][WARN ][o.e.i.IndicesService ] [node_s0] [.tasks/oSYOG0-9SHOx_pfAoiSExQ] failed to delete index 1> org.elasticsearch.env.ShardLockObtainFailedException: [.tasks][0]: obtaining shard lock timed out after 0ms 1> at org.elasticsearch.env.NodeEnvironment$InternalShardLock.acquire(NodeEnvironment.java:736) ~[main/:?] 1> at org.elasticsearch.env.NodeEnvironment.shardLock(NodeEnvironment.java:655) ~[main/:?] 1> at org.elasticsearch.env.NodeEnvironment.lockAllForIndex(NodeEnvironment.java:601) ~[main/:?] 1> at org.elasticsearch.env.NodeEnvironment.deleteIndexDirectorySafe(NodeEnvironment.java:554) ~[main/:?] ``` In the hope that we will be able to determine why the shard is still locked. Relates to #30290 as well as some other CI failures
This commit is contained in:
parent
e564c4d8ad
commit
dae48ba262
@ -19,12 +19,8 @@
|
|||||||
|
|
||||||
package org.elasticsearch.env;
|
package org.elasticsearch.env;
|
||||||
|
|
||||||
import java.io.UncheckedIOException;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
import org.apache.logging.log4j.Logger;
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.lucene.index.IndexWriter;
|
import org.apache.lucene.index.IndexWriter;
|
||||||
import org.apache.lucene.index.SegmentInfos;
|
import org.apache.lucene.index.SegmentInfos;
|
||||||
@ -34,22 +30,22 @@ import org.apache.lucene.store.Lock;
|
|||||||
import org.apache.lucene.store.LockObtainFailedException;
|
import org.apache.lucene.store.LockObtainFailedException;
|
||||||
import org.apache.lucene.store.NativeFSLockFactory;
|
import org.apache.lucene.store.NativeFSLockFactory;
|
||||||
import org.apache.lucene.store.SimpleFSDirectory;
|
import org.apache.lucene.store.SimpleFSDirectory;
|
||||||
import org.elasticsearch.common.CheckedFunction;
|
|
||||||
import org.elasticsearch.common.lease.Releasable;
|
|
||||||
import org.elasticsearch.core.internal.io.IOUtils;
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
import org.elasticsearch.common.CheckedFunction;
|
||||||
import org.elasticsearch.common.Randomness;
|
import org.elasticsearch.common.Randomness;
|
||||||
import org.elasticsearch.common.SuppressForbidden;
|
import org.elasticsearch.common.SuppressForbidden;
|
||||||
import org.elasticsearch.common.UUIDs;
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.io.FileSystemUtils;
|
import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Setting.Property;
|
import org.elasticsearch.common.settings.Setting.Property;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
import org.elasticsearch.gateway.MetaDataStateFormat;
|
import org.elasticsearch.gateway.MetaDataStateFormat;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
@ -63,6 +59,7 @@ import org.elasticsearch.node.Node;
|
|||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
import java.nio.file.AtomicMoveNotSupportedException;
|
import java.nio.file.AtomicMoveNotSupportedException;
|
||||||
import java.nio.file.DirectoryStream;
|
import java.nio.file.DirectoryStream;
|
||||||
import java.nio.file.FileStore;
|
import java.nio.file.FileStore;
|
||||||
@ -74,6 +71,7 @@ import java.util.Arrays;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -84,6 +82,8 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static java.util.Collections.unmodifiableSet;
|
import static java.util.Collections.unmodifiableSet;
|
||||||
|
|
||||||
@ -440,7 +440,7 @@ public final class NodeEnvironment implements Closeable {
|
|||||||
public void deleteShardDirectorySafe(ShardId shardId, IndexSettings indexSettings) throws IOException, ShardLockObtainFailedException {
|
public void deleteShardDirectorySafe(ShardId shardId, IndexSettings indexSettings) throws IOException, ShardLockObtainFailedException {
|
||||||
final Path[] paths = availableShardPaths(shardId);
|
final Path[] paths = availableShardPaths(shardId);
|
||||||
logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths);
|
logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths);
|
||||||
try (ShardLock lock = shardLock(shardId)) {
|
try (ShardLock lock = shardLock(shardId, "shard deletion under lock")) {
|
||||||
deleteShardDirectoryUnderLock(lock, indexSettings);
|
deleteShardDirectoryUnderLock(lock, indexSettings);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -532,7 +532,7 @@ public final class NodeEnvironment implements Closeable {
|
|||||||
|
|
||||||
private boolean isShardLocked(ShardId id) {
|
private boolean isShardLocked(ShardId id) {
|
||||||
try {
|
try {
|
||||||
shardLock(id, 0).close();
|
shardLock(id, "checking if shard is locked").close();
|
||||||
return false;
|
return false;
|
||||||
} catch (ShardLockObtainFailedException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
return true;
|
return true;
|
||||||
@ -551,7 +551,7 @@ public final class NodeEnvironment implements Closeable {
|
|||||||
*/
|
*/
|
||||||
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSettings indexSettings)
|
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSettings indexSettings)
|
||||||
throws IOException, ShardLockObtainFailedException {
|
throws IOException, ShardLockObtainFailedException {
|
||||||
final List<ShardLock> locks = lockAllForIndex(index, indexSettings, lockTimeoutMS);
|
final List<ShardLock> locks = lockAllForIndex(index, indexSettings, "deleting index directory", lockTimeoutMS);
|
||||||
try {
|
try {
|
||||||
deleteIndexDirectoryUnderLock(index, indexSettings);
|
deleteIndexDirectoryUnderLock(index, indexSettings);
|
||||||
} finally {
|
} finally {
|
||||||
@ -586,7 +586,8 @@ public final class NodeEnvironment implements Closeable {
|
|||||||
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
|
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
|
||||||
* @return the {@link ShardLock} instances for this index.
|
* @return the {@link ShardLock} instances for this index.
|
||||||
*/
|
*/
|
||||||
public List<ShardLock> lockAllForIndex(Index index, IndexSettings settings, long lockTimeoutMS) throws ShardLockObtainFailedException {
|
public List<ShardLock> lockAllForIndex(final Index index, final IndexSettings settings,
|
||||||
|
final String lockDetails, final long lockTimeoutMS) throws ShardLockObtainFailedException {
|
||||||
final int numShards = settings.getNumberOfShards();
|
final int numShards = settings.getNumberOfShards();
|
||||||
if (numShards <= 0) {
|
if (numShards <= 0) {
|
||||||
throw new IllegalArgumentException("settings must contain a non-null > 0 number of shards");
|
throw new IllegalArgumentException("settings must contain a non-null > 0 number of shards");
|
||||||
@ -598,7 +599,7 @@ public final class NodeEnvironment implements Closeable {
|
|||||||
try {
|
try {
|
||||||
for (int i = 0; i < numShards; i++) {
|
for (int i = 0; i < numShards; i++) {
|
||||||
long timeoutLeftMS = Math.max(0, lockTimeoutMS - TimeValue.nsecToMSec((System.nanoTime() - startTimeNS)));
|
long timeoutLeftMS = Math.max(0, lockTimeoutMS - TimeValue.nsecToMSec((System.nanoTime() - startTimeNS)));
|
||||||
allLocks.add(shardLock(new ShardId(index, i), timeoutLeftMS));
|
allLocks.add(shardLock(new ShardId(index, i), lockDetails, timeoutLeftMS));
|
||||||
}
|
}
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
@ -619,10 +620,11 @@ public final class NodeEnvironment implements Closeable {
|
|||||||
* Note: this method will return immediately if the lock can't be acquired.
|
* Note: this method will return immediately if the lock can't be acquired.
|
||||||
*
|
*
|
||||||
* @param id the shard ID to lock
|
* @param id the shard ID to lock
|
||||||
|
* @param details information about why the shard is being locked
|
||||||
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
|
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
|
||||||
*/
|
*/
|
||||||
public ShardLock shardLock(ShardId id) throws ShardLockObtainFailedException {
|
public ShardLock shardLock(ShardId id, final String details) throws ShardLockObtainFailedException {
|
||||||
return shardLock(id, 0);
|
return shardLock(id, details, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -631,11 +633,13 @@ public final class NodeEnvironment implements Closeable {
|
|||||||
* or recover from a different shard instance into it. If the shard lock can not be acquired
|
* or recover from a different shard instance into it. If the shard lock can not be acquired
|
||||||
* a {@link ShardLockObtainFailedException} is thrown
|
* a {@link ShardLockObtainFailedException} is thrown
|
||||||
* @param shardId the shard ID to lock
|
* @param shardId the shard ID to lock
|
||||||
|
* @param details information about why the shard is being locked
|
||||||
* @param lockTimeoutMS the lock timeout in milliseconds
|
* @param lockTimeoutMS the lock timeout in milliseconds
|
||||||
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
|
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
|
||||||
*/
|
*/
|
||||||
public ShardLock shardLock(final ShardId shardId, long lockTimeoutMS) throws ShardLockObtainFailedException {
|
public ShardLock shardLock(final ShardId shardId, final String details,
|
||||||
logger.trace("acquiring node shardlock on [{}], timeout [{}]", shardId, lockTimeoutMS);
|
final long lockTimeoutMS) throws ShardLockObtainFailedException {
|
||||||
|
logger.trace("acquiring node shardlock on [{}], timeout [{}], details [{}]", shardId, lockTimeoutMS, details);
|
||||||
final InternalShardLock shardLock;
|
final InternalShardLock shardLock;
|
||||||
final boolean acquired;
|
final boolean acquired;
|
||||||
synchronized (shardLocks) {
|
synchronized (shardLocks) {
|
||||||
@ -644,7 +648,7 @@ public final class NodeEnvironment implements Closeable {
|
|||||||
shardLock.incWaitCount();
|
shardLock.incWaitCount();
|
||||||
acquired = false;
|
acquired = false;
|
||||||
} else {
|
} else {
|
||||||
shardLock = new InternalShardLock(shardId);
|
shardLock = new InternalShardLock(shardId, details);
|
||||||
shardLocks.put(shardId, shardLock);
|
shardLocks.put(shardId, shardLock);
|
||||||
acquired = true;
|
acquired = true;
|
||||||
}
|
}
|
||||||
@ -652,7 +656,7 @@ public final class NodeEnvironment implements Closeable {
|
|||||||
if (acquired == false) {
|
if (acquired == false) {
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
shardLock.acquire(lockTimeoutMS);
|
shardLock.acquire(lockTimeoutMS, details);
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
if (success == false) {
|
if (success == false) {
|
||||||
@ -671,11 +675,11 @@ public final class NodeEnvironment implements Closeable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A functional interface that people can use to reference {@link #shardLock(ShardId, long)}
|
* A functional interface that people can use to reference {@link #shardLock(ShardId, String, long)}
|
||||||
*/
|
*/
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
public interface ShardLocker {
|
public interface ShardLocker {
|
||||||
ShardLock lock(ShardId shardId, long lockTimeoutMS) throws ShardLockObtainFailedException;
|
ShardLock lock(ShardId shardId, String lockDetails, long lockTimeoutMS) throws ShardLockObtainFailedException;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -698,11 +702,13 @@ public final class NodeEnvironment implements Closeable {
|
|||||||
*/
|
*/
|
||||||
private final Semaphore mutex = new Semaphore(1);
|
private final Semaphore mutex = new Semaphore(1);
|
||||||
private int waitCount = 1; // guarded by shardLocks
|
private int waitCount = 1; // guarded by shardLocks
|
||||||
|
private String lockDetails;
|
||||||
private final ShardId shardId;
|
private final ShardId shardId;
|
||||||
|
|
||||||
InternalShardLock(ShardId shardId) {
|
InternalShardLock(final ShardId shardId, final String details) {
|
||||||
this.shardId = shardId;
|
this.shardId = shardId;
|
||||||
mutex.acquireUninterruptibly();
|
mutex.acquireUninterruptibly();
|
||||||
|
lockDetails = details;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void release() {
|
protected void release() {
|
||||||
@ -730,11 +736,14 @@ public final class NodeEnvironment implements Closeable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void acquire(long timeoutInMillis) throws ShardLockObtainFailedException {
|
void acquire(long timeoutInMillis, final String details) throws ShardLockObtainFailedException {
|
||||||
try {
|
try {
|
||||||
if (mutex.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS) == false) {
|
if (mutex.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS)) {
|
||||||
|
lockDetails = details;
|
||||||
|
} else {
|
||||||
throw new ShardLockObtainFailedException(shardId,
|
throw new ShardLockObtainFailedException(shardId,
|
||||||
"obtaining shard lock timed out after " + timeoutInMillis + "ms");
|
"obtaining shard lock timed out after " + timeoutInMillis + "ms, previous lock details: [" + lockDetails +
|
||||||
|
"] trying to lock for [" + details + "]");
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
@ -334,7 +334,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
|||||||
IndexShard indexShard = null;
|
IndexShard indexShard = null;
|
||||||
ShardLock lock = null;
|
ShardLock lock = null;
|
||||||
try {
|
try {
|
||||||
lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
|
lock = nodeEnv.shardLock(shardId, "shard creation", TimeUnit.SECONDS.toMillis(5));
|
||||||
eventListener.beforeIndexShardCreated(shardId, indexSettings);
|
eventListener.beforeIndexShardCreated(shardId, indexSettings);
|
||||||
ShardPath path;
|
ShardPath path;
|
||||||
try {
|
try {
|
||||||
|
@ -436,7 +436,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||||||
*/
|
*/
|
||||||
public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker,
|
public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker,
|
||||||
Logger logger) throws IOException {
|
Logger logger) throws IOException {
|
||||||
try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5));
|
try (ShardLock lock = shardLocker.lock(shardId, "read metadata snapshot", TimeUnit.SECONDS.toMillis(5));
|
||||||
Directory dir = new SimpleFSDirectory(indexLocation)) {
|
Directory dir = new SimpleFSDirectory(indexLocation)) {
|
||||||
failIfCorrupted(dir, shardId);
|
failIfCorrupted(dir, shardId);
|
||||||
return new MetadataSnapshot(null, dir, logger);
|
return new MetadataSnapshot(null, dir, logger);
|
||||||
@ -457,7 +457,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||||||
*/
|
*/
|
||||||
public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker,
|
public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker,
|
||||||
Logger logger) throws IOException, ShardLockObtainFailedException {
|
Logger logger) throws IOException, ShardLockObtainFailedException {
|
||||||
try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5));
|
try (ShardLock lock = shardLocker.lock(shardId, "open index", TimeUnit.SECONDS.toMillis(5));
|
||||||
Directory dir = new SimpleFSDirectory(indexLocation)) {
|
Directory dir = new SimpleFSDirectory(indexLocation)) {
|
||||||
failIfCorrupted(dir, shardId);
|
failIfCorrupted(dir, shardId);
|
||||||
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
|
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
|
||||||
|
@ -1060,7 +1060,7 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||||||
throws IOException, InterruptedException, ShardLockObtainFailedException {
|
throws IOException, InterruptedException, ShardLockObtainFailedException {
|
||||||
logger.debug("{} processing pending deletes", index);
|
logger.debug("{} processing pending deletes", index);
|
||||||
final long startTimeNS = System.nanoTime();
|
final long startTimeNS = System.nanoTime();
|
||||||
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, indexSettings, timeout.millis());
|
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, indexSettings, "process pending deletes", timeout.millis());
|
||||||
int numRemoved = 0;
|
int numRemoved = 0;
|
||||||
try {
|
try {
|
||||||
Map<ShardId, ShardLock> locks = new HashMap<>();
|
Map<ShardId, ShardLock> locks = new HashMap<>();
|
||||||
|
@ -135,11 +135,11 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||||||
final NodeEnvironment env = newNodeEnvironment();
|
final NodeEnvironment env = newNodeEnvironment();
|
||||||
|
|
||||||
Index index = new Index("foo", "fooUUID");
|
Index index = new Index("foo", "fooUUID");
|
||||||
ShardLock fooLock = env.shardLock(new ShardId(index, 0));
|
ShardLock fooLock = env.shardLock(new ShardId(index, 0), "1");
|
||||||
assertEquals(new ShardId(index, 0), fooLock.getShardId());
|
assertEquals(new ShardId(index, 0), fooLock.getShardId());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
env.shardLock(new ShardId(index, 0));
|
env.shardLock(new ShardId(index, 0), "2");
|
||||||
fail("shard is locked");
|
fail("shard is locked");
|
||||||
} catch (ShardLockObtainFailedException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
// expected
|
// expected
|
||||||
@ -149,7 +149,7 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||||||
Files.createDirectories(path.resolve("1"));
|
Files.createDirectories(path.resolve("1"));
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
env.lockAllForIndex(index, idxSettings, randomIntBetween(0, 10));
|
env.lockAllForIndex(index, idxSettings, "3", randomIntBetween(0, 10));
|
||||||
fail("shard 0 is locked");
|
fail("shard 0 is locked");
|
||||||
} catch (ShardLockObtainFailedException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
// expected
|
// expected
|
||||||
@ -157,11 +157,11 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||||||
|
|
||||||
fooLock.close();
|
fooLock.close();
|
||||||
// can lock again?
|
// can lock again?
|
||||||
env.shardLock(new ShardId(index, 0)).close();
|
env.shardLock(new ShardId(index, 0), "4").close();
|
||||||
|
|
||||||
List<ShardLock> locks = env.lockAllForIndex(index, idxSettings, randomIntBetween(0, 10));
|
List<ShardLock> locks = env.lockAllForIndex(index, idxSettings, "5", randomIntBetween(0, 10));
|
||||||
try {
|
try {
|
||||||
env.shardLock(new ShardId(index, 0));
|
env.shardLock(new ShardId(index, 0), "6");
|
||||||
fail("shard is locked");
|
fail("shard is locked");
|
||||||
} catch (ShardLockObtainFailedException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
// expected
|
// expected
|
||||||
@ -239,7 +239,7 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||||||
public void testDeleteSafe() throws Exception {
|
public void testDeleteSafe() throws Exception {
|
||||||
final NodeEnvironment env = newNodeEnvironment();
|
final NodeEnvironment env = newNodeEnvironment();
|
||||||
final Index index = new Index("foo", "fooUUID");
|
final Index index = new Index("foo", "fooUUID");
|
||||||
ShardLock fooLock = env.shardLock(new ShardId(index, 0));
|
ShardLock fooLock = env.shardLock(new ShardId(index, 0), "1");
|
||||||
assertEquals(new ShardId(index, 0), fooLock.getShardId());
|
assertEquals(new ShardId(index, 0), fooLock.getShardId());
|
||||||
|
|
||||||
for (Path path : env.indexPaths(index)) {
|
for (Path path : env.indexPaths(index)) {
|
||||||
@ -295,7 +295,7 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||||||
@Override
|
@Override
|
||||||
protected void doRun() throws Exception {
|
protected void doRun() throws Exception {
|
||||||
start.await();
|
start.await();
|
||||||
try (ShardLock autoCloses = env.shardLock(new ShardId(index, 0))) {
|
try (ShardLock autoCloses = env.shardLock(new ShardId(index, 0), "2")) {
|
||||||
blockLatch.countDown();
|
blockLatch.countDown();
|
||||||
Thread.sleep(randomIntBetween(1, 10));
|
Thread.sleep(randomIntBetween(1, 10));
|
||||||
}
|
}
|
||||||
@ -353,7 +353,7 @@ public class NodeEnvironmentTests extends ESTestCase {
|
|||||||
for (int i = 0; i < iters; i++) {
|
for (int i = 0; i < iters; i++) {
|
||||||
int shard = randomIntBetween(0, counts.length - 1);
|
int shard = randomIntBetween(0, counts.length - 1);
|
||||||
try {
|
try {
|
||||||
try (ShardLock autoCloses = env.shardLock(new ShardId("foo", "fooUUID", shard),
|
try (ShardLock autoCloses = env.shardLock(new ShardId("foo", "fooUUID", shard), "1",
|
||||||
scaledRandomIntBetween(0, 10))) {
|
scaledRandomIntBetween(0, 10))) {
|
||||||
counts[shard].value++;
|
counts[shard].value++;
|
||||||
countsAtomic[shard].incrementAndGet();
|
countsAtomic[shard].incrementAndGet();
|
||||||
|
@ -263,7 +263,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||||||
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
|
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
|
||||||
// but index can't be opened for a failed shard
|
// but index can't be opened for a failed shard
|
||||||
assertThat("store index should be corrupted", StoreUtils.canOpenIndex(logger, shardPath.resolveIndex(), shard.shardId(),
|
assertThat("store index should be corrupted", StoreUtils.canOpenIndex(logger, shardPath.resolveIndex(), shard.shardId(),
|
||||||
(shardId, lockTimeoutMS) -> new DummyShardLock(shardId)),
|
(shardId, lockTimeoutMS, details) -> new DummyShardLock(shardId)),
|
||||||
equalTo(false));
|
equalTo(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -926,17 +926,17 @@ public class StoreTests extends ESTestCase {
|
|||||||
IndexWriterConfig iwc = newIndexWriterConfig();
|
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||||
Path tempDir = createTempDir();
|
Path tempDir = createTempDir();
|
||||||
final BaseDirectoryWrapper dir = newFSDirectory(tempDir);
|
final BaseDirectoryWrapper dir = newFSDirectory(tempDir);
|
||||||
assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
|
assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l, d) -> new DummyShardLock(id)));
|
||||||
IndexWriter writer = new IndexWriter(dir, iwc);
|
IndexWriter writer = new IndexWriter(dir, iwc);
|
||||||
Document doc = new Document();
|
Document doc = new Document();
|
||||||
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
|
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
|
||||||
writer.addDocument(doc);
|
writer.addDocument(doc);
|
||||||
writer.commit();
|
writer.commit();
|
||||||
writer.close();
|
writer.close();
|
||||||
assertTrue(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
|
assertTrue(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l, d) -> new DummyShardLock(id)));
|
||||||
Store store = new Store(shardId, INDEX_SETTINGS, dir, new DummyShardLock(shardId));
|
Store store = new Store(shardId, INDEX_SETTINGS, dir, new DummyShardLock(shardId));
|
||||||
store.markStoreCorrupted(new CorruptIndexException("foo", "bar"));
|
store.markStoreCorrupted(new CorruptIndexException("foo", "bar"));
|
||||||
assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
|
assertFalse(StoreUtils.canOpenIndex(logger, tempDir, shardId, (id, l, d) -> new DummyShardLock(id)));
|
||||||
store.close();
|
store.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2462,7 +2462,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||||||
Set<ShardId> shardIds = env.lockedShards();
|
Set<ShardId> shardIds = env.lockedShards();
|
||||||
for (ShardId id : shardIds) {
|
for (ShardId id : shardIds) {
|
||||||
try {
|
try {
|
||||||
env.shardLock(id, TimeUnit.SECONDS.toMillis(5)).close();
|
env.shardLock(id, "InternalTestCluster assert after test", TimeUnit.SECONDS.toMillis(5)).close();
|
||||||
} catch (ShardLockObtainFailedException ex) {
|
} catch (ShardLockObtainFailedException ex) {
|
||||||
fail("Shard " + id + " is still locked after 5 sec waiting");
|
fail("Shard " + id + " is still locked after 5 sec waiting");
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user