[CORE] Intorduce shards level locks to prevent concurrent shard modifications

Today it's possible that the data directory for a single shard is used by more than on
IndexShard->Store instances. While one shard is already closed but has a concurrent recovery
running and a new shard is creating it's engine files can conflict and data can potentially
be lost. We also remove shards data without checking if there are still users of the files
or if files are still open which can cause pending writes / flushes or the delete operation
to fail. If the latter is the case the index might be treated as a dangeling index and is brought
back to life at a later point in time.

This commit introduces a shard level lock that prevents modifications to the shard data
while it's still in use. Locks are created per shard and maintined in NodeEnvironment.java.
In contrast to most java concurrency primitives those locks are not reentrant.

This commit also adds infrastructure that checks if all shard locks are released after tests.
This commit is contained in:
Simon Willnauer 2014-11-16 14:24:29 +01:00
parent 37661aed60
commit 1c64a113de
35 changed files with 1364 additions and 406 deletions

View File

@ -53,7 +53,6 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;

View File

@ -82,6 +82,7 @@ public class FileSystemUtils {
* in the input array using {@link java.io.File#toPath()} * in the input array using {@link java.io.File#toPath()}
* @param files the files to get paths for * @param files the files to get paths for
*/ */
@Deprecated // this is only a transition API
public static Path[] toPaths(File... files) { public static Path[] toPaths(File... files) {
Path[] paths = new Path[files.length]; Path[] paths = new Path[files.length];
for (int i = 0; i < files.length; i++) { for (int i = 0; i < files.length; i++) {

View File

@ -19,79 +19,80 @@
package org.elasticsearch.env; package org.elasticsearch.env;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import org.apache.lucene.store.Lock; import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.AtomicMoveNotSupportedException; import java.nio.file.*;
import java.nio.file.Files; import java.util.*;
import java.nio.file.Path; import java.util.concurrent.*;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* * A component that holds all data paths for a single node.
*/ */
public class NodeEnvironment extends AbstractComponent { public class NodeEnvironment extends AbstractComponent implements Closeable{
private final File[] nodeFiles;
private final File[] nodeIndicesLocations;
/* ${data.paths}/nodes/{node.id} */
private final Path[] nodePaths;
/* ${data.paths}/nodes/{node.id}/indices */
private final Path[] nodeIndicesPaths;
private final Lock[] locks; private final Lock[] locks;
private final int localNodeId; private final int localNodeId;
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
private final Map<ShardId, InternalShardLock> shardLocks = new HashMap<>();
@Inject @Inject
public NodeEnvironment(Settings settings, Environment environment) { public NodeEnvironment(Settings settings, Environment environment) throws IOException {
super(settings); super(settings);
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) { if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
nodeFiles = null; nodePaths = null;
nodeIndicesLocations = null; nodeIndicesPaths = null;
locks = null; locks = null;
localNodeId = -1; localNodeId = -1;
return; return;
} }
File[] nodesFiles = new File[environment.dataWithClusterFiles().length]; final Path[] nodePaths = new Path[environment.dataWithClusterFiles().length];
Lock[] locks = new Lock[environment.dataWithClusterFiles().length]; final Lock[] locks = new Lock[environment.dataWithClusterFiles().length];
int localNodeId = -1; int localNodeId = -1;
IOException lastException = null; IOException lastException = null;
int maxLocalStorageNodes = settings.getAsInt("node.max_local_storage_nodes", 50); int maxLocalStorageNodes = settings.getAsInt("node.max_local_storage_nodes", 50);
for (int possibleLockId = 0; possibleLockId < maxLocalStorageNodes; possibleLockId++) { for (int possibleLockId = 0; possibleLockId < maxLocalStorageNodes; possibleLockId++) {
for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) { for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) {
File dir = new File(new File(environment.dataWithClusterFiles()[dirIndex], "nodes"), Integer.toString(possibleLockId)); Path dir = environment.dataWithClusterFiles()[dirIndex].toPath().resolve(Paths.get("nodes", Integer.toString(possibleLockId)));
if (!dir.exists()) { if (Files.exists(dir) == false) {
FileSystemUtils.mkdirs(dir); Files.createDirectories(dir);
} }
logger.trace("obtaining node lock on {} ...", dir.getAbsolutePath()); logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath());
try { try {
NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir.toPath()); NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir);
Lock tmpLock = lockFactory.makeLock("node.lock"); Lock tmpLock = lockFactory.makeLock("node.lock");
boolean obtained = tmpLock.obtain(); boolean obtained = tmpLock.obtain();
if (obtained) { if (obtained) {
locks[dirIndex] = tmpLock; locks[dirIndex] = tmpLock;
nodesFiles[dirIndex] = dir; nodePaths[dirIndex] = dir;
localNodeId = possibleLockId; localNodeId = possibleLockId;
} else { } else {
logger.trace("failed to obtain node lock on {}", dir.getAbsolutePath()); logger.trace("failed to obtain node lock on {}", dir.toAbsolutePath());
// release all the ones that were obtained up until now // release all the ones that were obtained up until now
for (int i = 0; i < locks.length; i++) { for (int i = 0; i < locks.length; i++) {
if (locks[i] != null) { if (locks[i] != null) {
@ -102,8 +103,8 @@ public class NodeEnvironment extends AbstractComponent {
break; break;
} }
} catch (IOException e) { } catch (IOException e) {
logger.trace("failed to obtain node lock on {}", e, dir.getAbsolutePath()); logger.trace("failed to obtain node lock on {}", e, dir.toAbsolutePath());
lastException = new IOException("failed to obtain lock on " + dir.getAbsolutePath(), e); lastException = new IOException("failed to obtain lock on " + dir.toAbsolutePath(), e);
// release all the ones that were obtained up until now // release all the ones that were obtained up until now
for (int i = 0; i < locks.length; i++) { for (int i = 0; i < locks.length; i++) {
IOUtils.closeWhileHandlingException(locks[i]); IOUtils.closeWhileHandlingException(locks[i]);
@ -123,21 +124,205 @@ public class NodeEnvironment extends AbstractComponent {
this.localNodeId = localNodeId; this.localNodeId = localNodeId;
this.locks = locks; this.locks = locks;
this.nodeFiles = nodesFiles; this.nodePaths = nodePaths;
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("using node location [{}], local_node_id [{}]", nodesFiles, localNodeId); logger.debug("using node location [{}], local_node_id [{}]", nodePaths, localNodeId);
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("node data locations details:\n"); StringBuilder sb = new StringBuilder("node data locations details:\n");
for (File file : nodesFiles) { for (Path file : nodePaths) {
sb.append(" -> ").append(file.getAbsolutePath()).append(", free_space [").append(new ByteSizeValue(file.getFreeSpace())).append("], usable_space [").append(new ByteSizeValue(file.getUsableSpace())).append("]\n"); sb.append(" -> ").append(file.toAbsolutePath()).append(", free_space [").append(new ByteSizeValue(Files.getFileStore(file).getUnallocatedSpace())).append("], usable_space [").append(new ByteSizeValue(Files.getFileStore(file).getUsableSpace())).append("]\n");
} }
logger.trace(sb.toString()); logger.trace(sb.toString());
} }
this.nodeIndicesLocations = new File[nodeFiles.length]; this.nodeIndicesPaths = new Path[nodePaths.length];
for (int i = 0; i < nodeFiles.length; i++) { for (int i = 0; i < nodePaths.length; i++) {
nodeIndicesLocations[i] = new File(nodeFiles[i], "indices"); nodeIndicesPaths[i] = nodePaths[i].resolve("indices");
}
}
/**
* Deletes a shard data directory iff the shards locks were successfully acquired.
*
* @param shardId the id of the shard to delete to delete
* @throws IOException if an IOException occurs
*/
public void deleteShardDirectorySafe(ShardId shardId) throws IOException {
final Path[] paths = shardPaths(shardId);
try (Closeable lock = shardLock(shardId)) {
IOUtils.rm(paths);
}
}
/**
* Deletes an indexes data directory recursively iff all of the indexes
* shards locks were successfully acquired. If any of the indexes shard directories can't be locked
* non of the shards will be deleted
*
* @param index the index to delete
* @throws Exception if any of the shards data directories can't be locked or deleted
*/
public void deleteIndexDirectorySafe(Index index) throws IOException {
final List<ShardLock> locks = lockAllForIndex(index);
try {
final Path[] indexPaths = new Path[nodeIndicesPaths.length];
for (int i = 0; i < indexPaths.length; i++) {
indexPaths[i] = nodeIndicesPaths[i].resolve(index.name());
}
IOUtils.rm(indexPaths);
} finally {
IOUtils.closeWhileHandlingException(locks);
}
}
/**
* Tries to lock all local shards for the given index. If any of the shard locks can't be acquired
* an {@link LockObtainFailedException} is thrown and all previously acquired locks are released.
*
* @param index the index to lock shards for
* @return the {@link ShardLock} instances for this index.
* @throws IOException if an IOException occurs.
*/
public List<ShardLock> lockAllForIndex(Index index) throws IOException {
Set<ShardId> allShardIds = findAllShardIds(index);
List<ShardLock> allLocks = new ArrayList<>();
boolean success = false;
try {
for (ShardId shardId : allShardIds) {
allLocks.add(shardLock(shardId));
}
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(allLocks);
}
}
return allLocks;
}
/**
* Tries to lock the given shards ID. A shard lock is required to perform any kind of
* write operation on a shards data directory like deleting files, creating a new index writer
* or recover from a different shard instance into it. If the shard lock can not be acquired
* an {@link LockObtainFailedException} is thrown.
*
* Note: this method will return immediately if the lock can't be acquired.
*
* @param id the shard ID to lock
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
* @throws IOException if an IOException occurs.
*/
public ShardLock shardLock(ShardId id) throws IOException {
return shardLock(id, 0);
}
/**
* Tries to lock the given shards ID. A shard lock is required to perform any kind of
* write operation on a shards data directory like deleting files, creating a new index writer
* or recover from a different shard instance into it. If the shard lock can not be acquired
* an {@link org.apache.lucene.store.LockObtainFailedException} is thrown
* @param id the shard ID to lock
* @param lockTimeoutMS the lock timeout in milliseconds
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
* @throws IOException if an IOException occurs.
*/
public ShardLock shardLock(final ShardId id, long lockTimeoutMS) throws IOException {
final InternalShardLock shardLock;
final boolean acquired;
synchronized (shardLocks) {
if (shardLocks.containsKey(id)) {
shardLock = shardLocks.get(id);
shardLock.incWaitCount();
acquired = false;
} else {
shardLock = new InternalShardLock(id);
shardLocks.put(id, shardLock);
acquired = true;
}
}
if (acquired == false) {
boolean success = false;
try {
shardLock.acquire(lockTimeoutMS);
success = true;
} finally {
if (success == false) {
shardLock.decWaitCount();
}
}
}
return new ShardLock(id) { // new instance prevents double closing
@Override
protected void closeInternal() {
shardLock.release();
}
};
}
/**
* Returns all currently lock shards
*/
public Set<ShardId> lockedShards() {
synchronized (this) {
ImmutableSet.Builder<ShardId> builder = ImmutableSet.builder();
return builder.addAll(shardLocks.keySet()).build();
}
}
private final class InternalShardLock {
/*
* This class holds a mutex for exclusive access and timeout / wait semantics
* and a reference count to cleanup the shard lock instance form the internal data
* structure if nobody is waiting for it. the wait count is guarded by the same lock
* that is used to mutate the map holding the shard locks to ensure exclusive access
*/
private final Semaphore mutex = new Semaphore(1);
private int waitCount = 1; // guarded by shardLocks
private ShardId shardId;
InternalShardLock(ShardId id) {
shardId = id;
mutex.acquireUninterruptibly();
}
protected void release() {
mutex.release();
decWaitCount();
}
void incWaitCount() {
synchronized (shardLocks) {
assert waitCount > 0 : "waitCount is " + waitCount + " but should be > 0";
waitCount++;
}
}
private void decWaitCount() {
synchronized (shardLocks) {
assert waitCount > 0 : "waitCount is " + waitCount + " but should be > 0";
if (--waitCount == 0) {
InternalShardLock remove = shardLocks.remove(shardId);
assert remove != null : "Removed lock was null";
}
}
}
void acquire(long timeoutInMillis) throws LockObtainFailedException{
try {
if (mutex.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS) == false) {
throw new LockObtainFailedException("Can't lock shard " + shardId + ", timed out after " + timeoutInMillis + "ms");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockObtainFailedException("Can't lock shard " + shardId + ", interrupted", e);
}
} }
} }
@ -146,87 +331,119 @@ public class NodeEnvironment extends AbstractComponent {
} }
public boolean hasNodeFile() { public boolean hasNodeFile() {
return nodeFiles != null && locks != null; return nodePaths != null && locks != null;
} }
public File[] nodeDataLocations() { /**
* Returns an array of all of the nodes data locations.
* @throws org.elasticsearch.ElasticsearchIllegalStateException if the node is not configured to store local locations
*/
public Path[] nodeDataPaths() {
assert assertEnvIsLocked(); assert assertEnvIsLocked();
if (nodeFiles == null || locks == null) { if (nodePaths == null || locks == null) {
throw new ElasticsearchIllegalStateException("node is not configured to store local location"); throw new ElasticsearchIllegalStateException("node is not configured to store local location");
} }
return nodeFiles; return nodePaths;
} }
public File[] indicesLocations() { /**
assert assertEnvIsLocked(); * Returns an array of all of the nodes data locations.
return nodeIndicesLocations; * @deprecated use {@link #nodeDataPaths()} instead
*/
@Deprecated
public File[] nodeDataLocations() {
return toFiles(nodeDataPaths());
} }
/**
* Returns all data paths for the given index.
* @deprecated use {@link #indexPaths(org.elasticsearch.index.Index)} instead
*/
@Deprecated
public File[] indexLocations(Index index) { public File[] indexLocations(Index index) {
assert assertEnvIsLocked(); return toFiles(indexPaths(index));
File[] indexLocations = new File[nodeFiles.length];
for (int i = 0; i < nodeFiles.length; i++) {
indexLocations[i] = new File(new File(nodeFiles[i], "indices"), index.name());
}
return indexLocations;
} }
/**
* Returns all data paths for the given shards ID
* @deprecated use {@link #shardPaths(org.elasticsearch.index.shard.ShardId)} instead
*/
@Deprecated
public File[] shardLocations(ShardId shardId) { public File[] shardLocations(ShardId shardId) {
return toFiles(shardPaths(shardId));
}
/**
* Returns all data paths for the given index.
*/
public Path[] indexPaths(Index index) {
assert assertEnvIsLocked(); assert assertEnvIsLocked();
File[] shardLocations = new File[nodeFiles.length]; Path[] indexPaths = new Path[nodeIndicesPaths.length];
for (int i = 0; i < nodeFiles.length; i++) { for (int i = 0; i < nodeIndicesPaths.length; i++) {
shardLocations[i] = new File(new File(new File(nodeFiles[i], "indices"), shardId.index().name()), Integer.toString(shardId.id())); indexPaths[i] = nodeIndicesPaths[i].resolve(index.name());
}
return indexPaths;
}
/**
* Returns all data paths for the given shards ID
*/
public Path[] shardPaths(ShardId shardId) {
assert assertEnvIsLocked();
final Path[] nodePaths = nodeDataPaths();
final Path[] shardLocations = new Path[nodePaths.length];
for (int i = 0; i < nodePaths.length; i++) {
shardLocations[i] = nodePaths[i].resolve(Paths.get("indices", shardId.index().name(), Integer.toString(shardId.id())));
} }
return shardLocations; return shardLocations;
} }
public Set<String> findAllIndices() throws Exception { public Set<String> findAllIndices() throws Exception {
if (nodeFiles == null || locks == null) { if (nodePaths == null || locks == null) {
throw new ElasticsearchIllegalStateException("node is not configured to store local location"); throw new ElasticsearchIllegalStateException("node is not configured to store local location");
} }
assert assertEnvIsLocked(); assert assertEnvIsLocked();
Set<String> indices = Sets.newHashSet(); Set<String> indices = Sets.newHashSet();
for (File indicesLocation : nodeIndicesLocations) { for (Path indicesLocation : nodeIndicesPaths) {
File[] indicesList = indicesLocation.listFiles();
if (indicesList == null) { if (Files.exists(indicesLocation) && Files.isDirectory(indicesLocation)) {
continue; try (DirectoryStream<Path> stream = Files.newDirectoryStream(indicesLocation)) {
for (Path index : stream) {
if (Files.isDirectory(index)) {
indices.add(index.getFileName().toString());
}
} }
for (File indexLocation : indicesList) {
if (indexLocation.isDirectory()) {
indices.add(indexLocation.getName());
} }
} }
} }
return indices; return indices;
} }
public Set<ShardId> findAllShardIds() throws Exception { /**
if (nodeFiles == null || locks == null) { * Tries to find all allocated shards for the given index or for all indices iff the given index is <code>null</code>
* on the current node. NOTE: This methods is prone to race-conditions on the filesystem layer since it might not
* see directories created concurrently or while it's traversing.
* @param index the index to filter shards for or <code>null</code> if all shards for all indices should be listed
* @return a set of shard IDs
* @throws IOException if an IOException occurs
*/
public Set<ShardId> findAllShardIds(@Nullable final Index index) throws IOException {
if (nodePaths == null || locks == null) {
throw new ElasticsearchIllegalStateException("node is not configured to store local location"); throw new ElasticsearchIllegalStateException("node is not configured to store local location");
} }
assert assertEnvIsLocked(); assert assertEnvIsLocked();
Set<ShardId> shardIds = Sets.newHashSet(); return findAllShardIds(index == null ? null : index.getName(), nodeIndicesPaths);
for (File indicesLocation : nodeIndicesLocations) {
File[] indicesList = indicesLocation.listFiles();
if (indicesList == null) {
continue;
} }
for (File indexLocation : indicesList) {
if (!indexLocation.isDirectory()) { private static Set<ShardId> findAllShardIds(@Nullable final String index, Path... locations) throws IOException {
continue; final Set<ShardId> shardIds = Sets.newHashSet();
for (final Path location : locations) {
if (Files.exists(location) && Files.isDirectory(location)) {
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(location)) {
for (Path indexPath : indexStream) {
if (index == null || index.equals(indexPath.getFileName().toString())) {
shardIds.addAll(findAllShardsForIndex(indexPath));
} }
String indexName = indexLocation.getName();
File[] shardsList = indexLocation.listFiles();
if (shardsList == null) {
continue;
}
for (File shardLocation : shardsList) {
if (!shardLocation.isDirectory()) {
continue;
}
Integer shardId = Ints.tryParse(shardLocation.getName());
if (shardId != null) {
shardIds.add(new ShardId(indexName, shardId));
} }
} }
} }
@ -234,6 +451,36 @@ public class NodeEnvironment extends AbstractComponent {
return shardIds; return shardIds;
} }
private static Set<ShardId> findAllShardsForIndex(Path indexPath) throws IOException {
Set<ShardId> shardIds = new HashSet<>();
if (Files.exists(indexPath) && Files.isDirectory(indexPath)) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
String currentIndex = indexPath.getFileName().toString();
for (Path shardPath : stream) {
if (Files.exists(shardPath) && Files.isDirectory(shardPath)) {
Integer shardId = Ints.tryParse(shardPath.getFileName().toString());
if (shardId != null) {
shardIds.add(new ShardId(currentIndex, shardId));
}
}
}
}
}
return shardIds;
}
/**
* Tries to find all allocated shards for all indices iff the given index on the current node. NOTE: This methods
* is prone to race-conditions on the filesystem layer since it might not see directories created concurrently or
* while it's traversing.
*
* @return a set of shard IDs
* @throws IOException if an IOException occurs
*/
public Set<ShardId> findAllShardIds() throws IOException {
return findAllShardIds(null);
}
public void close() { public void close() {
if (closed.compareAndSet(false, true) && locks != null) { if (closed.compareAndSet(false, true) && locks != null) {
for (Lock lock : locks) { for (Lock lock : locks) {
@ -269,19 +516,37 @@ public class NodeEnvironment extends AbstractComponent {
* This method cleans up all files even in the case of an error. * This method cleans up all files even in the case of an error.
*/ */
public void ensureAtomicMoveSupported() throws IOException { public void ensureAtomicMoveSupported() throws IOException {
for (File file : nodeFiles) { final Path[] nodePaths = nodeDataPaths();
assert file.isDirectory(); for (Path directory : nodePaths) {
final Path src = new File(file, "__es__.tmp").toPath(); assert Files.isDirectory(directory) : directory + " is not a directory";
final Path src = directory.resolve("__es__.tmp");
Files.createFile(src); Files.createFile(src);
final Path target = new File(file, "__es__.final").toPath(); final Path target = directory.resolve("__es__.final");
try { try {
Files.move(src, target, StandardCopyOption.ATOMIC_MOVE); Files.move(src, target, StandardCopyOption.ATOMIC_MOVE);
} catch (AtomicMoveNotSupportedException ex) { } catch (AtomicMoveNotSupportedException ex) {
throw new ElasticsearchIllegalStateException("atomic_move is not supported by the filesystem on path [" + file.getCanonicalPath() + "] atomic_move is required for elasticsearch to work correctly.", ex); throw new ElasticsearchIllegalStateException("atomic_move is not supported by the filesystem on path [" + directory + "] atomic_move is required for elasticsearch to work correctly.", ex);
} finally { } finally {
Files.deleteIfExists(src); Files.deleteIfExists(src);
Files.deleteIfExists(target); Files.deleteIfExists(target);
} }
} }
} }
/**
* Returns an array of {@link File} build from the correspondent element
* in the input array using {@link java.nio.file.Path#toFile()} )}
* @param files the files to get paths for
*/
@Deprecated // this is only a transition API
private static File[] toFiles(Path... files) {
File[] paths = new File[files.length];
for (int i = 0; i < files.length; i++) {
paths[i] = files[i].toFile();
}
return paths;
}
} }

View File

@ -0,0 +1,77 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.env;
import org.apache.lucene.store.Lock;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.index.shard.ShardId;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A shard lock guarantees exclusive access to a shards data
* directory. Internal processes should acquire a lock on a shard
* before executing any write operations on the shards data directory.
*
* @see org.elasticsearch.env.NodeEnvironment
*/
public abstract class ShardLock implements Closeable {
private final ShardId shardId;
private final AtomicBoolean closed = new AtomicBoolean(false);
public ShardLock(ShardId id) {
this.shardId = id;
}
/**
* Returns the locks shards Id.
*/
public final ShardId getShardId() {
return shardId;
}
@Override
public final void close() throws IOException {
if (this.closed.compareAndSet(false, true)) {
closeInternal();
}
}
protected abstract void closeInternal();
/**
* Returns true if this lock is still open ie. has not been closed yet.
*/
public final boolean isOpen() {
return closed.get() == false;
}
@Override
public String toString() {
return "ShardLock{" +
"shardId=" + shardId +
'}';
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.gateway.local.state.meta;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
@ -36,7 +37,6 @@ import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -246,9 +246,11 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keys()); logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keys());
if (nodeEnv.hasNodeFile()) { if (nodeEnv.hasNodeFile()) {
try { try {
IOUtils.rm(FileSystemUtils.toPaths(nodeEnv.indexLocations(new Index(current.index())))); nodeEnv.deleteIndexDirectorySafe(new Index(current.index()));
} catch (LockObtainFailedException ex) {
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, current.index());
} catch (Exception ex) { } catch (Exception ex) {
logger.debug("[{}] failed to delete index", ex, current.index()); logger.warn("[{}] failed to delete index", ex, current.index());
} }
} }
try { try {
@ -282,16 +284,24 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
// already dangling, continue // already dangling, continue
continue; continue;
} }
IndexMetaData indexMetaData = loadIndexState(indexName); final IndexMetaData indexMetaData = loadIndexState(indexName);
if (indexMetaData != null) { if (indexMetaData != null) {
if (danglingTimeout.millis() == 0) { if (danglingTimeout.millis() == 0) {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName); logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
try { try {
IOUtils.rm(FileSystemUtils.toPaths(nodeEnv.indexLocations(new Index(indexName)))); nodeEnv.deleteIndexDirectorySafe(new Index(indexName));
} catch (LockObtainFailedException ex) {
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, indexName);
} catch (Exception ex) { } catch (Exception ex) {
logger.debug("[{}] failed to delete dangling index", ex, indexName); logger.warn("[{}] failed to delete dangling index", ex, indexName);
} }
} else { } else {
try { // the index deletion might not have worked due to shards still being locked
IOUtils.closeWhileHandlingException(nodeEnv.lockAllForIndex(new Index(indexName)));
} catch (IOException ex) {
logger.warn("[{}] skipping locked dangling index, exists on local file system, but not in cluster metadata, auto import to cluster state is set to [{}]", ex, indexName, autoImportDangled);
continue;
}
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled); logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled);
danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName)))); danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName))));
} }
@ -591,7 +601,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
} }
logger.warn("[{}] deleting dangling index", index); logger.warn("[{}] deleting dangling index", index);
try { try {
IOUtils.rm(FileSystemUtils.toPaths(nodeEnv.indexLocations(new Index(index)))); nodeEnv.deleteIndexDirectorySafe(new Index(index));
} catch (Exception ex) { } catch (Exception ex) {
logger.debug("failed to delete dangling index", ex); logger.debug("failed to delete dangling index", ex);
} }

View File

@ -19,7 +19,6 @@
package org.elasticsearch.gateway.none; package org.elasticsearch.gateway.none;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
@ -29,7 +28,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.Gateway;
@ -44,7 +42,6 @@ public class NoneGateway extends AbstractLifecycleComponent<Gateway> implements
public static final String TYPE = "none"; public static final String TYPE = "none";
private final ClusterService clusterService;
private final NodeEnvironment nodeEnv; private final NodeEnvironment nodeEnv;
private final NodeIndexDeletedAction nodeIndexDeletedAction; private final NodeIndexDeletedAction nodeIndexDeletedAction;
private final ClusterName clusterName; private final ClusterName clusterName;
@ -55,7 +52,6 @@ public class NoneGateway extends AbstractLifecycleComponent<Gateway> implements
@Inject @Inject
public NoneGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, NodeIndexDeletedAction nodeIndexDeletedAction, ClusterName clusterName) { public NoneGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, NodeIndexDeletedAction nodeIndexDeletedAction, ClusterName clusterName) {
super(settings); super(settings);
this.clusterService = clusterService;
this.nodeEnv = nodeEnv; this.nodeEnv = nodeEnv;
this.nodeIndexDeletedAction = nodeIndexDeletedAction; this.nodeIndexDeletedAction = nodeIndexDeletedAction;
this.clusterName = clusterName; this.clusterName = clusterName;
@ -119,7 +115,7 @@ public class NoneGateway extends AbstractLifecycleComponent<Gateway> implements
logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keys()); logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keys());
if (nodeEnv.hasNodeFile()) { if (nodeEnv.hasNodeFile()) {
try { try {
IOUtils.rm(FileSystemUtils.toPaths(nodeEnv.indexLocations(new Index(current.index())))); nodeEnv.deleteIndexDirectorySafe(new Index(current.index()));
} catch (Exception ex) { } catch (Exception ex) {
logger.debug("failed to delete shard locations", ex); logger.debug("failed to delete shard locations", ex);
} }

View File

@ -292,7 +292,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
indexShard.performRecoveryFinalization(true); indexShard.performRecoveryFinalization(true);
try { try {
Files.delete(recoveringTranslogFile.toPath()); Files.deleteIfExists(recoveringTranslogFile.toPath());
} catch (Exception ex) { } catch (Exception ex) {
logger.debug("Failed to delete recovering translog file {}", ex, recoveringTranslogFile); logger.debug("Failed to delete recovering translog file {}", ex, recoveringTranslogFile);
} }

View File

@ -106,4 +106,5 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard> {
Injector shardInjectorSafe(int shardId) throws IndexShardMissingException; Injector shardInjectorSafe(int shardId) throws IndexShardMissingException;
String indexUUID(); String indexUUID();
} }

View File

@ -21,14 +21,19 @@ package org.elasticsearch.index.service;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator; import com.google.common.collect.UnmodifiableIterator;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.*; import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.*; import org.elasticsearch.index.*;
import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.analysis.AnalysisService;
@ -75,15 +80,22 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogModule; import org.elasticsearch.index.translog.TranslogModule;
import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ShardsPluginsModule; import org.elasticsearch.plugins.ShardsPluginsModule;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.collect.Maps.newHashMap; import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
@ -127,11 +139,13 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private final IndexSettingsService settingsService; private final IndexSettingsService settingsService;
private final NodeEnvironment nodeEnv;
private volatile ImmutableMap<Integer, Injector> shardsInjectors = ImmutableMap.of(); private volatile ImmutableMap<Integer, Injector> shardsInjectors = ImmutableMap.of();
private volatile ImmutableMap<Integer, IndexShard> shards = ImmutableMap.of(); private volatile ImmutableMap<Integer, IndexShard> shards = ImmutableMap.of();
private volatile boolean closed = false; private final AtomicBoolean closed = new AtomicBoolean(false);
@Inject @Inject
public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool, public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool,
@ -163,6 +177,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
indexCache.filter().setIndexService(this); indexCache.filter().setIndexService(this);
indexFieldData.setIndexService(this); indexFieldData.setIndexService(this);
bitSetFilterCache.setIndexService(this); bitSetFilterCache.setIndexService(this);
this.nodeEnv = nodeEnv;
} }
@Override @Override
@ -264,21 +279,24 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return indexEngine; return indexEngine;
} }
public void close(final String reason, @Nullable Executor executor) { public void close(final String reason, @Nullable Executor executor, final IndicesService.IndexCloseListener listener) {
synchronized (this) { if (closed.compareAndSet(false, true)) {
closed = true; final Set<Integer> shardIds = shardIds();
}
Set<Integer> shardIds = shardIds();
final CountDownLatch latch = new CountDownLatch(shardIds.size()); final CountDownLatch latch = new CountDownLatch(shardIds.size());
final IndicesService.IndexCloseListener innerListener = listener == null ? null :
new PerShardIndexCloseListener(shardIds, listener);
for (final int shardId : shardIds) { for (final int shardId : shardIds) {
executor = executor == null ? threadPool.generic() : executor; executor = executor == null ? threadPool.generic() : executor;
executor.execute(new Runnable() { executor.execute(new AbstractRunnable() {
@Override @Override
public void run() { public void onFailure(Throwable t) {
logger.warn("failed to close shard", t);
}
@Override
public void doRun() {
try { try {
removeShard(shardId, reason); removeShard(shardId, reason, innerListener);
} catch (Throwable e) {
logger.warn("failed to close shard", e);
} finally { } finally {
latch.countDown(); latch.countDown();
} }
@ -292,6 +310,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
}
@Override @Override
public Injector shardInjector(int shardId) throws ElasticsearchException { public Injector shardInjector(int shardId) throws ElasticsearchException {
@ -319,17 +338,21 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
* be able to serialize the execution via the cluster state in the future. for now we just * be able to serialize the execution via the cluster state in the future. for now we just
* keep it synced. * keep it synced.
*/ */
if (closed) { if (closed.get()) {
throw new ElasticsearchIllegalStateException("Can't create shard [" + index.name() + "][" + sShardId + "], closed"); throw new ElasticsearchIllegalStateException("Can't create shard [" + index.name() + "][" + sShardId + "], closed");
} }
ShardId shardId = new ShardId(index, sShardId); ShardId shardId = new ShardId(index, sShardId);
ShardLock lock = null;
boolean success = false;
try {
lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
if (shardsInjectors.containsKey(shardId.id())) { if (shardsInjectors.containsKey(shardId.id())) {
throw new IndexShardAlreadyExistsException(shardId + " already exists"); throw new IndexShardAlreadyExistsException(shardId + " already exists");
} }
indicesLifecycle.beforeIndexShardCreated(shardId); indicesLifecycle.beforeIndexShardCreated(shardId);
logger.debug("creating shard_id [{}]", shardId.id()); logger.debug("creating shard_id {}", shardId);
ModulesBuilder modules = new ModulesBuilder(); ModulesBuilder modules = new ModulesBuilder();
modules.add(new ShardsPluginsModule(indexSettings, pluginsService)); modules.add(new ShardsPluginsModule(indexSettings, pluginsService));
@ -337,7 +360,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
modules.add(new ShardIndexingModule()); modules.add(new ShardIndexingModule());
modules.add(new ShardSearchModule()); modules.add(new ShardSearchModule());
modules.add(new ShardGetModule()); modules.add(new ShardGetModule());
modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class))); modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class), lock));
modules.add(new DeletionPolicyModule(indexSettings)); modules.add(new DeletionPolicyModule(indexSettings));
modules.add(new MergePolicyModule(indexSettings)); modules.add(new MergePolicyModule(indexSettings));
modules.add(new MergeSchedulerModule(indexSettings)); modules.add(new MergeSchedulerModule(indexSettings));
@ -365,20 +388,32 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
shardsInjectors = newMapBuilder(shardsInjectors).put(shardId.id(), shardInjector).immutableMap(); shardsInjectors = newMapBuilder(shardsInjectors).put(shardId.id(), shardInjector).immutableMap();
IndexShard indexShard = shardInjector.getInstance(IndexShard.class); IndexShard indexShard = shardInjector.getInstance(IndexShard.class);
indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created"); indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created");
indicesLifecycle.afterIndexShardCreated(indexShard); indicesLifecycle.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
success = true;
return indexShard; return indexShard;
} catch (IOException ex) {
throw new IndexShardCreationException(shardId, ex);
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(lock);
}
}
} }
@Override @Override
public synchronized void removeShard(int shardId, String reason) throws ElasticsearchException { public void removeShard(int shardId, String reason) throws ElasticsearchException {
removeShard(shardId, reason, null);
}
public synchronized void removeShard(int shardId, String reason, @Nullable final IndicesService.IndexCloseListener listener) throws ElasticsearchException {
boolean listenerPassed = false;
final ShardId sId = new ShardId(index, shardId);
try {
final Injector shardInjector; final Injector shardInjector;
final IndexShard indexShard; final IndexShard indexShard;
final ShardId sId = new ShardId(index, shardId);
Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors); Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors);
shardInjector = tmpShardInjectors.remove(shardId); shardInjector = tmpShardInjectors.remove(shardId);
if (shardInjector == null) { if (shardInjector == null) {
@ -457,15 +492,72 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
// call this before we close the store, so we can release resources for it // call this before we close the store, so we can release resources for it
indicesLifecycle.afterIndexShardClosed(sId, indexShard); indicesLifecycle.afterIndexShardClosed(sId, indexShard);
// if we delete or have no gateway or the store is not persistent, clean the store... // if we delete or have no gateway or the store is not persistent, clean the store...
Store store = shardInjector.getInstance(Store.class); final Store store = shardInjector.getInstance(Store.class);
// and close it // and close it
try { try {
listenerPassed = true;
if (listener == null) {
store.close(); store.close();
} else {
store.close(new Store.OnCloseListener() {
@Override
public void onClose(ShardId shardId) {
listener.onShardClosed(shardId);
}
});
}
} catch (Throwable e) { } catch (Throwable e) {
logger.warn("[{}] failed to close store on shard deletion", e, shardId); logger.warn("[{}] failed to close store on shard deletion", e, shardId);
} }
Injectors.close(injector); Injectors.close(injector);
logger.debug("[{}] closed (reason: [{}])", shardId, reason); logger.debug("[{}] closed (reason: [{}])", shardId, reason);
} catch (Throwable t) {
if (listenerPassed == false && listener != null) { // only notify if the listener wasn't passed to the store
listener.onShardCloseFailed(sId, t);
}
throw t;
}
}
private static final class PerShardIndexCloseListener implements IndicesService.IndexCloseListener {
final CountDown countDown;
final List<Throwable> failures;
private final Set<Integer> shardIds;
private final IndicesService.IndexCloseListener listener;
public PerShardIndexCloseListener(Set<Integer> shardIds, IndicesService.IndexCloseListener listener) {
this.shardIds = shardIds;
this.listener = listener;
countDown = new CountDown(shardIds.size());
failures = new CopyOnWriteArrayList<>();
}
@Override
public void onAllShardsClosed(Index index, List<Throwable> failures) {
assert false : "nobody should call this";
}
@Override
public void onShardClosed(ShardId shardId) {
assert countDown.isCountedDown() == false;
assert shardIds.contains(shardId.getId()) : "Unknown shard id";
listener.onShardClosed(shardId);
if (countDown.countDown()) {
listener.onAllShardsClosed(shardId.index(), failures);
}
}
@Override
public void onShardCloseFailed(ShardId shardId, Throwable t) {
assert countDown.isCountedDown() == false;
assert shardIds.contains(shardId.getId()) : "Unkown shard id";
listener.onShardCloseFailed(shardId, t);
failures.add(t);
if (countDown.countDown()) {
listener.onAllShardsClosed(shardId.index(), failures);
}
}
} }
} }

View File

@ -20,14 +20,31 @@
package org.elasticsearch.index.store; package org.elasticsearch.index.store;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.distributor.Distributor;
import java.io.IOException; import java.io.IOException;
/** /**
*/ */
public interface DirectoryService { public abstract class DirectoryService extends AbstractIndexShardComponent {
Directory[] build() throws IOException; protected DirectoryService(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
long throttleTimeInNanos(); }
public abstract Directory[] build() throws IOException;
public abstract long throttleTimeInNanos();
/**
* Creates a new Directory from the given distributor.
* The default implementation returns a new {@link org.elasticsearch.index.store.DistributorDirectory}
*/
public Directory newFromDistributor(Distributor distributor) throws IOException {
return new DistributorDirectory(distributor);
}
} }

View File

@ -20,7 +20,6 @@ package org.elasticsearch.index.store;
import org.apache.lucene.store.*; import org.apache.lucene.store.*;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.math.MathUtils; import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.index.store.distributor.Distributor; import org.elasticsearch.index.store.distributor.Distributor;
@ -128,9 +127,14 @@ public final class DistributorDirectory extends BaseDirectory {
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
try {
assert assertConsistency();
} finally {
IOUtils.close(distributor.all()); IOUtils.close(distributor.all());
} }
}
/** /**
* Returns the directory that has previously been associated with this file name. * Returns the directory that has previously been associated with this file name.
* *
@ -183,14 +187,13 @@ public final class DistributorDirectory extends BaseDirectory {
/** /**
* Basic checks to ensure the internal mapping is consistent - should only be used in assertions * Basic checks to ensure the internal mapping is consistent - should only be used in assertions
*/ */
static boolean assertConsistency(ESLogger logger, DistributorDirectory dir) throws IOException { private synchronized boolean assertConsistency() throws IOException {
synchronized (dir) {
boolean consistent = true; boolean consistent = true;
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
Directory[] all = dir.distributor.all(); Directory[] all = distributor.all();
for (Directory d : all) { for (Directory d : all) {
for (String file : d.listAll()) { for (String file : d.listAll()) {
final Directory directory = dir.nameDirMapping.get(file); final Directory directory = nameDirMapping.get(file);
if (directory == null) { if (directory == null) {
consistent = false; consistent = false;
builder.append("File ").append(file) builder.append("File ").append(file)
@ -205,13 +208,9 @@ public final class DistributorDirectory extends BaseDirectory {
} }
} }
if (consistent == false) {
logger.info(builder.toString());
}
assert consistent : builder.toString(); assert consistent : builder.toString();
return consistent; // return boolean so it can be easily be used in asserts return consistent; // return boolean so it can be easily be used in asserts
} }
}
/** /**
* This inner class is a simple wrapper around the original * This inner class is a simple wrapper around the original

View File

@ -33,8 +33,6 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
@ -44,6 +42,7 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted; import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.CloseableIndexComponent; import org.elasticsearch.index.CloseableIndexComponent;
import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
@ -92,8 +91,8 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
private final CodecService codecService; private final CodecService codecService;
private final DirectoryService directoryService; private final DirectoryService directoryService;
private final StoreDirectory directory; private final StoreDirectory directory;
private final DistributorDirectory distributorDirectory;
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
private final ShardLock shardLock;
private final AbstractRefCounted refCounter = new AbstractRefCounted("store") { private final AbstractRefCounted refCounter = new AbstractRefCounted("store") {
@Override @Override
@ -102,17 +101,19 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
Store.this.closeInternal(); Store.this.closeInternal();
} }
}; };
private volatile OnCloseListener onClose;
@Inject @Inject
public Store(ShardId shardId, @IndexSettings Settings indexSettings, CodecService codecService, DirectoryService directoryService, Distributor distributor) throws IOException { public Store(ShardId shardId, @IndexSettings Settings indexSettings, CodecService codecService, DirectoryService directoryService, Distributor distributor, ShardLock shardLock) throws IOException {
super(shardId, indexSettings); super(shardId, indexSettings);
this.codecService = codecService; this.codecService = codecService;
this.directoryService = directoryService; this.directoryService = directoryService;
this.distributorDirectory = new DistributorDirectory(distributor); this.directory = new StoreDirectory(directoryService.newFromDistributor(distributor));
this.directory = new StoreDirectory(distributorDirectory); this.shardLock = shardLock;
assert shardLock != null;
assert shardLock.getShardId().equals(shardId);
} }
public Directory directory() { public Directory directory() {
ensureOpen(); ensureOpen();
return directory; return directory;
@ -195,7 +196,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
failIfCorrupted(); failIfCorrupted();
metadataLock.readLock().lock(); metadataLock.readLock().lock();
try { try {
return new MetadataSnapshot(commit, distributorDirectory, logger); return new MetadataSnapshot(commit, directory, logger);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
markStoreCorrupted(ex); markStoreCorrupted(ex);
throw ex; throw ex;
@ -258,20 +259,18 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
*/ */
public void deleteContent() throws IOException { public void deleteContent() throws IOException {
ensureOpen(); ensureOpen();
final String[] files = distributorDirectory.listAll(); final String[] files = directory.listAll();
IOException lastException = null; final List<IOException> exceptions = new ArrayList<>();
for (String file : files) { for (String file : files) {
try { try {
distributorDirectory.deleteFile(file); directory.deleteFile(file);
} catch (NoSuchFileException | FileNotFoundException e) { } catch (NoSuchFileException | FileNotFoundException e) {
// ignore // ignore
} catch (IOException e) { } catch (IOException e) {
lastException = e; exceptions.add(e);
} }
} }
if (lastException != null) { ExceptionsHelper.rethrowAndSuppress(exceptions);
throw lastException;
}
} }
public StoreStats stats() throws IOException { public StoreStats stats() throws IOException {
@ -281,7 +280,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
public void renameFile(String from, String to) throws IOException { public void renameFile(String from, String to) throws IOException {
ensureOpen(); ensureOpen();
distributorDirectory.renameFile(from, to); directory.renameFile(from, to);
} }
/** /**
@ -337,17 +336,43 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
@Override @Override
public void close() { public void close() {
close(null);
}
/**
* Closes this store and installs the given {@link org.elasticsearch.index.store.Store.OnCloseListener}
* to be notified once all references to this store are released and the store is closed.
*/
public void close(@Nullable OnCloseListener onClose) {
if (isClosed.compareAndSet(false, true)) { if (isClosed.compareAndSet(false, true)) {
assert this.onClose == null : "OnClose listener is already set";
this.onClose = onClose;
// only do this once! // only do this once!
decRef(); decRef();
} }
} }
private void closeInternal() { private void closeInternal() {
final OnCloseListener listener = onClose;
onClose = null;
try { try {
directory.innerClose(); // this closes the distributorDirectory as well directory.innerClose(); // this closes the distributorDirectory as well
} catch (IOException e) { } catch (IOException e) {
logger.debug("failed to close directory", e); logger.debug("failed to close directory", e);
} finally {
try {
IOUtils.closeWhileHandlingException(shardLock);
} finally {
try {
if (listener != null) {
listener.onClose(shardId);
}
} catch (Exception ex){
logger.debug("OnCloseListener threw an exception", ex);
}
}
} }
} }
@ -534,14 +559,9 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
} }
private void innerClose() throws IOException { private void innerClose() throws IOException {
try {
assert DistributorDirectory.assertConsistency(logger, distributorDirectory);
} finally {
super.close(); super.close();
} }
}
@Override @Override
public String toString() { public String toString() {
return "store(" + in.toString() + ")"; return "store(" + in.toString() + ")";
@ -837,8 +857,8 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
} }
public synchronized void write(Store store) throws IOException { public synchronized void write(Store store) throws IOException {
synchronized (store.distributorDirectory) { synchronized (store.directory) {
Tuple<Map<String, String>, Long> tuple = MetadataSnapshot.readLegacyChecksums(store.distributorDirectory); Tuple<Map<String, String>, Long> tuple = MetadataSnapshot.readLegacyChecksums(store.directory);
tuple.v1().putAll(legacyChecksums); tuple.v1().putAll(legacyChecksums);
if (!tuple.v1().isEmpty()) { if (!tuple.v1().isEmpty()) {
writeChecksums(store.directory, tuple.v1(), tuple.v2()); writeChecksums(store.directory, tuple.v1(), tuple.v2());
@ -1101,4 +1121,17 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
directory().sync(Collections.singleton(uuid)); directory().sync(Collections.singleton(uuid));
} }
} }
/**
* A listener that is called once this store is closed and all references are released
*/
public static interface OnCloseListener {
/**
* Called once the store is closed and all references are released.
*
* @param shardId the shard ID the calling store belongs to.
*/
public void onClose(ShardId shardId);
}
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.store;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.store.distributor.Distributor; import org.elasticsearch.index.store.distributor.Distributor;
import org.elasticsearch.index.store.distributor.LeastUsedDistributor; import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.store.distributor.RandomWeightedDistributor; import org.elasticsearch.index.store.distributor.RandomWeightedDistributor;
@ -37,12 +38,14 @@ public class StoreModule extends AbstractModule {
private final Settings settings; private final Settings settings;
private final IndexStore indexStore; private final IndexStore indexStore;
private final ShardLock lock;
private Class<? extends Distributor> distributor; private Class<? extends Distributor> distributor;
public StoreModule(Settings settings, IndexStore indexStore) { public StoreModule(Settings settings, IndexStore indexStore, ShardLock lock) {
this.indexStore = indexStore; this.indexStore = indexStore;
this.settings = settings; this.settings = settings;
this.lock = lock;
} }
public void setDistributor(Class<? extends Distributor> distributor) { public void setDistributor(Class<? extends Distributor> distributor) {
@ -53,6 +56,7 @@ public class StoreModule extends AbstractModule {
protected void configure() { protected void configure() {
bind(DirectoryService.class).to(indexStore.shardDirectory()).asEagerSingleton(); bind(DirectoryService.class).to(indexStore.shardDirectory()).asEagerSingleton();
bind(Store.class).asEagerSingleton(); bind(Store.class).asEagerSingleton();
bind(ShardLock.class).toInstance(lock);
if (distributor == null) { if (distributor == null) {
distributor = loadDistributor(settings); distributor = loadDistributor(settings);
} }

View File

@ -20,25 +20,20 @@
package org.elasticsearch.index.store.fs; package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.*; import org.apache.lucene.store.*;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.DirectoryUtils;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
/** /**
*/ */
public abstract class FsDirectoryService extends AbstractIndexShardComponent implements DirectoryService, StoreRateLimiting.Listener, StoreRateLimiting.Provider { public abstract class FsDirectoryService extends DirectoryService implements StoreRateLimiting.Listener, StoreRateLimiting.Provider {
protected final FsIndexStore indexStore; protected final FsIndexStore indexStore;

View File

@ -19,9 +19,7 @@
package org.elasticsearch.index.store.fs; package org.elasticsearch.index.store.fs;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
@ -83,7 +81,7 @@ public abstract class FsIndexStore extends AbstractIndexStore {
throw new ElasticsearchIllegalStateException(shardId + " allocated, can't be deleted"); throw new ElasticsearchIllegalStateException(shardId + " allocated, can't be deleted");
} }
try { try {
IOUtils.rm(FileSystemUtils.toPaths(shardLocations(shardId))); nodeEnv.deleteShardDirectorySafe(shardId);
} catch (Exception ex) { } catch (Exception ex) {
logger.debug("failed to delete shard locations", ex); logger.debug("failed to delete shard locations", ex);
} }

View File

@ -25,17 +25,15 @@ import org.apache.lucene.store.RAMFile;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.DirectoryUtils;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
/** /**
*/ */
public final class RamDirectoryService extends AbstractIndexShardComponent implements DirectoryService { public final class RamDirectoryService extends DirectoryService {
@Inject @Inject
public RamDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings) { public RamDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings) {

View File

@ -25,7 +25,12 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import java.util.List;
/** /**
* *
@ -74,4 +79,33 @@ public interface IndicesService extends Iterable<IndexService>, LifecycleCompone
IndexService createIndex(String index, Settings settings, String localNodeId) throws ElasticsearchException; IndexService createIndex(String index, Settings settings, String localNodeId) throws ElasticsearchException;
void removeIndex(String index, String reason) throws ElasticsearchException; void removeIndex(String index, String reason) throws ElasticsearchException;
void removeIndex(String index, String reason, @Nullable IndexCloseListener listener) throws ElasticsearchException;
/**
* A listener interface that can be used to get notification once a shard or all shards
* of an certain index that are allocated on a node are actually closed. The listener methods
* are invoked once the actual low level instance modifying or reading a shard are closed in contrast to
* removal methods that might return earlier.
*/
public static interface IndexCloseListener {
/**
* Invoked once all shards are closed or their closing failed.
* @param index the index that got closed
* @param failures the recorded shard closing failures
*/
public void onAllShardsClosed(Index index, List<Throwable> failures);
/**
* Invoked once the last resource using the given shard ID is released
*/
public void onShardClosed(ShardId shardId);
/**
* Invoked if closing the given shard failed.
*/
public void onShardCloseFailed(ShardId shardId, Throwable t);
}
} }

View File

@ -65,6 +65,7 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.IndexStoreModule; import org.elasticsearch.index.store.IndexStoreModule;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.analysis.IndicesAnalysisService; import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.store.IndicesStore;
@ -74,10 +75,8 @@ import org.elasticsearch.plugins.PluginsService;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.*;
import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static com.google.common.collect.Maps.newHashMap; import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@ -136,17 +135,27 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
@Override @Override
public void run() { public void run() {
try { try {
removeIndex(index, "shutdown", shardsStopExecutor); removeIndex(index, "shutdown", shardsStopExecutor, new IndexCloseListener() {
} catch (Throwable e) { @Override
logger.warn("failed to delete index on stop [" + index + "]", e); public void onAllShardsClosed(Index index, List<Throwable> failures) {
} finally {
latch.countDown(); latch.countDown();
} }
@Override
public void onShardClosed(ShardId shardId) {}
@Override
public void onShardCloseFailed(ShardId shardId, Throwable t) {}
});
} catch (Throwable e) {
latch.countDown();
logger.warn("failed to delete index on stop [" + index + "]", e);
}
} }
}); });
} }
try { try {
latch.await(); if (latch.await(30, TimeUnit.SECONDS) == false) {
logger.warn("Not all shards are closed yet, waited 30sec - stopping service");
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore // ignore
} finally { } finally {
@ -316,10 +325,15 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
@Override @Override
public void removeIndex(String index, String reason) throws ElasticsearchException { public void removeIndex(String index, String reason) throws ElasticsearchException {
removeIndex(index, reason, null); removeIndex(index, reason, null, null);
} }
private synchronized void removeIndex(String index, String reason, @Nullable Executor executor) throws ElasticsearchException { @Override
public void removeIndex(String index, String reason, @Nullable IndexCloseListener listener) throws ElasticsearchException {
removeIndex(index, reason, null, listener);
}
private synchronized void removeIndex(String index, String reason, @Nullable Executor executor, @Nullable IndexCloseListener listener) throws ElasticsearchException {
IndexService indexService; IndexService indexService;
Injector indexInjector = indicesInjectors.remove(index); Injector indexInjector = indicesInjectors.remove(index);
if (indexInjector == null) { if (indexInjector == null) {
@ -338,7 +352,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
} }
logger.debug("[{}] closing index service", index, reason); logger.debug("[{}] closing index service", index, reason);
((InternalIndexService) indexService).close(reason, executor); ((InternalIndexService) indexService).close(reason, executor, listener);
logger.debug("[{}] closing index cache", index, reason); logger.debug("[{}] closing index cache", index, reason);
indexInjector.getInstance(IndexCache.class).close(); indexInjector.getInstance(IndexCache.class).close();
@ -363,6 +377,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
logger.debug("[{}] closed... (reason [{}])", index, reason); logger.debug("[{}] closed... (reason [{}])", index, reason);
indicesLifecycle.afterIndexClosed(indexService.index()); indicesLifecycle.afterIndexClosed(indexService.index());
} }
static class OldShardsStats extends IndicesLifecycle.Listener { static class OldShardsStats extends IndicesLifecycle.Listener {

View File

@ -46,6 +46,8 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexShardAlreadyExistsException; import org.elasticsearch.index.IndexShardAlreadyExistsException;
import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.aliases.IndexAlias; import org.elasticsearch.index.aliases.IndexAlias;
@ -67,10 +69,8 @@ import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.HashMap; import java.io.IOException;
import java.util.Iterator; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -97,6 +97,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// a list of shards that failed during recovery // a list of shards that failed during recovery
// we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update // we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update
private final ConcurrentMap<ShardId, FailedShard> failedShards = ConcurrentCollections.newConcurrentMap(); private final ConcurrentMap<ShardId, FailedShard> failedShards = ConcurrentCollections.newConcurrentMap();
private final NodeEnvironment nodeEnvironment;
static class FailedShard { static class FailedShard {
public final long version; public final long version;
@ -119,7 +120,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
ThreadPool threadPool, RecoveryTarget recoveryTarget, ThreadPool threadPool, RecoveryTarget recoveryTarget,
ShardStateAction shardStateAction, ShardStateAction shardStateAction,
NodeIndexDeletedAction nodeIndexDeletedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingRefreshAction nodeMappingRefreshAction) { NodeMappingRefreshAction nodeMappingRefreshAction,
NodeEnvironment nodeEnvironment) {
super(settings); super(settings);
this.indicesService = indicesService; this.indicesService = indicesService;
this.clusterService = clusterService; this.clusterService = clusterService;
@ -130,6 +132,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
this.nodeMappingRefreshAction = nodeMappingRefreshAction; this.nodeMappingRefreshAction = nodeMappingRefreshAction;
this.sendRefreshMapping = componentSettings.getAsBoolean("send_refresh_mapping", true); this.sendRefreshMapping = componentSettings.getAsBoolean("send_refresh_mapping", true);
this.nodeEnvironment = nodeEnvironment;
} }
@Override @Override
@ -254,7 +257,33 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index, no longer part of the metadata", index); logger.debug("[{}] cleaning index, no longer part of the metadata", index);
} }
removeIndex(index, "index no longer part of the metadata"); removeIndex(index, "index no longer part of the metadata", new IndicesService.IndexCloseListener() {
@Override
public void onAllShardsClosed(Index index, List<Throwable> failures) {
try {
nodeEnvironment.deleteIndexDirectorySafe(index);
logger.debug("deleted index [{}] from filesystem", index);
} catch (Exception e) {
logger.debug("failed to deleted index [{}] from filesystem", e, index);
// ignore - still some shards locked here
}
}
@Override
public void onShardClosed(ShardId shardId) {
try {
nodeEnvironment.deleteShardDirectorySafe(shardId);
logger.debug("deleted shard [{}] from filesystem", shardId);
} catch (IOException e) {
logger.warn("Can't delete shard {} ", e, shardId);
}
}
@Override
public void onShardCloseFailed(ShardId shardId, Throwable t) {
}
});
} }
} }
} }
@ -839,10 +868,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} }
} }
} }
private void removeIndex(String index, String reason) { private void removeIndex(String index, String reason) {
removeIndex(index, reason, null);
}
private void removeIndex(String index, String reason, @Nullable IndicesService.IndexCloseListener listener) {
try { try {
indicesService.removeIndex(index, reason); indicesService.removeIndex(index, reason, listener);
} catch (Throwable e) { } catch (Throwable e) {
logger.warn("failed to clean index ({})", e, reason); logger.warn("failed to clean index ({})", e, reason);
} }

View File

@ -20,7 +20,6 @@
package org.elasticsearch.indices.store; package org.elasticsearch.indices.store;
import org.apache.lucene.store.StoreRateLimiting; import org.apache.lucene.store.StoreRateLimiting;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -325,7 +324,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
if (FileSystemUtils.exists(shardLocations)) { if (FileSystemUtils.exists(shardLocations)) {
logger.debug("{} deleting shard that is no longer used", shardId); logger.debug("{} deleting shard that is no longer used", shardId);
try { try {
IOUtils.rm(FileSystemUtils.toPaths(shardLocations)); nodeEnv.deleteShardDirectorySafe(shardId);
} catch (Exception ex) { } catch (Exception ex) {
logger.debug("failed to delete shard locations", ex); logger.debug("failed to delete shard locations", ex);
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.node.internal;
import org.elasticsearch.Build; import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.bench.BenchmarkModule; import org.elasticsearch.action.bench.BenchmarkModule;
@ -98,6 +99,7 @@ import org.elasticsearch.tribe.TribeService;
import org.elasticsearch.watcher.ResourceWatcherModule; import org.elasticsearch.watcher.ResourceWatcherModule;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -150,8 +152,12 @@ public final class InternalNode implements Node {
this.environment = new Environment(this.settings()); this.environment = new Environment(this.settings());
CompressorFactory.configure(settings); CompressorFactory.configure(settings);
final NodeEnvironment nodeEnvironment;
NodeEnvironment nodeEnvironment = new NodeEnvironment(this.settings, this.environment); try {
nodeEnvironment = new NodeEnvironment(this.settings, this.environment);
} catch (IOException ex) {
throw new ElasticsearchIllegalStateException("Failed to created node environment", ex);
}
boolean success = false; boolean success = false;
try { try {

View File

@ -0,0 +1,305 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.env;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class NodeEnvironmentTests extends ElasticsearchTestCase {
@Test
public void testNodeLockSingleEnvironment() throws IOException {
String[] dataPaths = tmpPaths();
Settings settings = ImmutableSettings.builder()
.put(nodeEnvSettings(dataPaths))
.put("node.max_local_storage_nodes", 1).build();
NodeEnvironment env = new NodeEnvironment(settings, new Environment(settings));
try {
new NodeEnvironment(settings, new Environment(settings));
fail("env is already locked");
} catch (ElasticsearchIllegalStateException ex) {
}
env.close();
// now can recreate and lock it
env = new NodeEnvironment(settings, new Environment(settings));
assertEquals(env.nodeDataPaths().length, dataPaths.length);
for (int i = 0; i < dataPaths.length; i++) {
assertTrue(env.nodeDataPaths()[i].startsWith(Paths.get(dataPaths[i])));
}
env.close();
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
}
@Test
public void testNodeLockMultipleEnvironment() throws IOException {
String[] dataPaths = tmpPaths();
Settings settings = nodeEnvSettings(dataPaths);
NodeEnvironment first = new NodeEnvironment(settings, new Environment(settings));
NodeEnvironment second = new NodeEnvironment(settings, new Environment(settings));
assertEquals(first.nodeDataPaths().length, dataPaths.length);
assertEquals(second.nodeDataPaths().length, dataPaths.length);
for (int i = 0; i < dataPaths.length; i++) {
assertEquals(first.nodeDataPaths()[i].getParent(), second.nodeDataPaths()[i].getParent());
}
IOUtils.close(first, second);
}
@Test
public void testShardLock() throws IOException {
Settings settings = nodeEnvSettings(tmpPaths());
NodeEnvironment env = new NodeEnvironment(settings, new Environment(settings));
ShardLock fooLock = env.shardLock(new ShardId("foo", 1));
assertEquals(new ShardId("foo", 1), fooLock.getShardId());
try {
env.shardLock(new ShardId("foo", 1));
fail("shard is locked");
} catch (LockObtainFailedException ex) {
// expected
}
for (Path path : env.indexPaths(new Index("foo"))) {
Files.createDirectories(path.resolve("1"));
Files.createDirectories(path.resolve("2"));
}
try {
env.lockAllForIndex(new Index("foo"));
fail("shard 1 is locked");
} catch (LockObtainFailedException ex) {
// expected
}
fooLock.close();
// can lock again?
env.shardLock(new ShardId("foo", 1)).close();
List<ShardLock> locks = env.lockAllForIndex(new Index("foo"));
try {
env.shardLock(new ShardId("foo", randomBoolean() ? 1 : 2));
fail("shard is locked");
} catch (LockObtainFailedException ex) {
// expected
}
IOUtils.close(locks);
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
}
@Test
public void testGetAllIndices() throws Exception {
Settings settings = nodeEnvSettings(tmpPaths());
NodeEnvironment env = new NodeEnvironment(settings, new Environment(settings));
final int numIndices = randomIntBetween(1, 10);
for (int i = 0; i < numIndices; i++) {
for (Path path : env.indexPaths(new Index("foo" + i))) {
Files.createDirectories(path);
}
}
Set<String> indices = env.findAllIndices();
assertEquals(indices.size(), numIndices);
for (int i = 0; i < numIndices; i++) {
assertTrue(indices.contains("foo" + i));
}
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
}
@Test
public void testDeleteSafe() throws IOException {
Settings settings = nodeEnvSettings(tmpPaths());
NodeEnvironment env = new NodeEnvironment(settings, new Environment(settings));
ShardLock fooLock = env.shardLock(new ShardId("foo", 1));
assertEquals(new ShardId("foo", 1), fooLock.getShardId());
for (Path path : env.indexPaths(new Index("foo"))) {
Files.createDirectories(path.resolve("1"));
Files.createDirectories(path.resolve("2"));
}
try {
env.deleteShardDirectorySafe(new ShardId("foo", 1));
fail("shard is locked");
} catch (LockObtainFailedException ex) {
// expected
}
for (Path path : env.indexPaths(new Index("foo"))) {
assertTrue(Files.exists(path.resolve("1")));
assertTrue(Files.exists(path.resolve("2")));
}
env.deleteShardDirectorySafe(new ShardId("foo", 2));
for (Path path : env.indexPaths(new Index("foo"))) {
assertTrue(Files.exists(path.resolve("1")));
assertFalse(Files.exists(path.resolve("2")));
}
try {
env.deleteIndexDirectorySafe(new Index("foo"));
fail("shard is locked");
} catch (LockObtainFailedException ex) {
// expected
}
fooLock.close();
for (Path path : env.indexPaths(new Index("foo"))) {
assertTrue(Files.exists(path));
}
env.deleteIndexDirectorySafe(new Index("foo"));
for (Path path : env.indexPaths(new Index("foo"))) {
assertFalse(Files.exists(path));
}
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
}
@Test
public void testGetAllShards() throws Exception {
Settings settings = nodeEnvSettings(tmpPaths());
NodeEnvironment env = new NodeEnvironment(settings, new Environment(settings));
final int numIndices = randomIntBetween(1, 10);
final Set<ShardId> createdShards = new HashSet<>();
for (int i = 0; i < numIndices; i++) {
for (Path path : env.indexPaths(new Index("foo" + i))) {
final int numShards = randomIntBetween(1, 10);
for (int j = 0; j < numShards; j++) {
Files.createDirectories(path.resolve(Integer.toString(j)));
createdShards.add(new ShardId("foo" + i, j));
}
}
}
Set<ShardId> shards = env.findAllShardIds();
assertEquals(shards.size(), createdShards.size());
assertEquals(shards, createdShards);
Index index = new Index("foo" + randomIntBetween(1, numIndices));
shards = env.findAllShardIds(index);
for (ShardId id : createdShards) {
if (index.getName().equals(id.getIndex())) {
assertNotNull("missing shard " + id, shards.remove(id));
}
}
assertEquals("too many shards found", shards.size(), 0);
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
}
@Test
public void testStressShardLock() throws IOException, InterruptedException {
class Int {
int value = 0;
}
Settings settings = nodeEnvSettings(tmpPaths());
final NodeEnvironment env = new NodeEnvironment(settings, new Environment(settings));
final int shards = randomIntBetween(2, 10);
final Int[] counts = new Int[shards];
final AtomicInteger[] countsAtomic = new AtomicInteger[shards];
final AtomicInteger[] flipFlop = new AtomicInteger[shards];
for (int i = 0; i < counts.length; i++) {
counts[i] = new Int();
countsAtomic[i] = new AtomicInteger();
flipFlop[i] = new AtomicInteger();
}
Thread[] threads = new Thread[randomIntBetween(2,5)];
final CountDownLatch latch = new CountDownLatch(1);
final int iters = scaledRandomIntBetween(10000, 100000);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread() {
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
fail(e.getMessage());
}
for (int i = 0; i < iters; i++) {
int shard = randomIntBetween(0, counts.length-1);
try {
try (ShardLock _ = env.shardLock(new ShardId("foo", shard), scaledRandomIntBetween(0, 10))) {
counts[shard].value++;
countsAtomic[shard].incrementAndGet();
assertEquals(flipFlop[shard].incrementAndGet(), 1);
assertEquals(flipFlop[shard].decrementAndGet(), 0);
}
} catch (LockObtainFailedException ex) {
// ok
} catch (IOException ex) {
fail(ex.toString());
}
}
}
};
threads[i].start();
}
latch.countDown(); // fire the threads up
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
for (int i = 0; i < counts.length; i++) {
assertTrue(counts[i].value > 0);
assertEquals(flipFlop[i].get(), 0);
assertEquals(counts[i].value, countsAtomic[i].get());
}
}
private String[] tmpPaths() {
final int numPaths = randomIntBetween(1, 3);
final String[] absPaths = new String[numPaths];
for (int i = 0; i < numPaths; i++) {
absPaths[i] = newTempDir().getAbsolutePath();
}
return absPaths;
}
private Settings nodeEnvSettings(String[] dataPaths) {
return ImmutableSettings.builder()
.put("path.home", newTempDir().getAbsolutePath())
.putArray("path.data", dataPaths).build();
}
}

View File

@ -30,10 +30,9 @@ import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField; import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Lock;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
@ -44,6 +43,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.analysis.AnalysisService;
@ -73,6 +73,7 @@ import org.elasticsearch.index.store.ram.RamDirectoryService;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogSizeMatcher; import org.elasticsearch.index.translog.TranslogSizeMatcher;
import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.MatcherAssert; import org.hamcrest.MatcherAssert;
@ -173,12 +174,12 @@ public class InternalEngineTests extends ElasticsearchTestCase {
protected Store createStore() throws IOException { protected Store createStore() throws IOException {
DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS); DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS);
return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService)); return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId));
} }
protected Store createStoreReplica() throws IOException { protected Store createStoreReplica() throws IOException {
DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS); DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS);
return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService)); return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId));
} }
protected Translog createTranslog() { protected Translog createTranslog() {

View File

@ -18,9 +18,11 @@
*/ */
package org.elasticsearch.index.merge.policy; package org.elasticsearch.index.merge.policy;
import org.apache.lucene.store.Lock;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -28,6 +30,7 @@ import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.distributor.LeastUsedDistributor; import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.store.ram.RamDirectoryService; import org.elasticsearch.index.store.ram.RamDirectoryService;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test; import org.junit.Test;
@ -172,7 +175,7 @@ public class MergePolicySettingsTest extends ElasticsearchTestCase {
protected Store createStore(Settings settings) throws IOException { protected Store createStore(Settings settings) throws IOException {
DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS); DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS);
return new Store(shardId, settings, null, directoryService, new LeastUsedDistributor(directoryService)); return new Store(shardId, settings, null, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId));
} }
} }

View File

@ -92,7 +92,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder() return ImmutableSettings.builder()
// we really need local GW here since this also checks for corruption etc. // we really need local GW here since this also checks for corruption etc.
// and we need to make sure primaries are not just trashed if we don'tmvn have replicas // and we need to make sure primaries are not just trashed if we don't have replicas
.put(super.nodeSettings(nodeOrdinal)).put("gateway.type", "local") .put(super.nodeSettings(nodeOrdinal)).put("gateway.type", "local")
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()).build(); .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()).build();
} }

View File

@ -18,10 +18,6 @@
*/ */
package org.elasticsearch.index.store; package org.elasticsearch.index.store;
import com.carrotsearch.randomizedtesting.annotations.Listeners;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
import com.carrotsearch.randomizedtesting.annotations.*; import com.carrotsearch.randomizedtesting.annotations.*;
import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFileNames;
@ -35,12 +31,10 @@ import org.elasticsearch.index.store.distributor.Distributor;
import org.elasticsearch.test.ElasticsearchThreadFilter; import org.elasticsearch.test.ElasticsearchThreadFilter;
import org.elasticsearch.test.junit.listeners.LoggingListener; import org.elasticsearch.test.junit.listeners.LoggingListener;
import java.io.IOException;
import java.nio.file.Path;
import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.nio.file.NoSuchFileException; import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
@ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class}) @ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class})
@ -62,13 +56,7 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase {
((MockDirectoryWrapper) directories[i]).setEnableVirusScanner(false); ((MockDirectoryWrapper) directories[i]).setEnableVirusScanner(false);
} }
} }
return new FilterDirectory(new DistributorDirectory(directories)) { return new DistributorDirectory(directories);
@Override
public void close() throws IOException {
assertTrue(DistributorDirectory.assertConsistency(logger, ((DistributorDirectory) this.getDelegate())));
super.close();
}
};
} }
// #7306: don't invoke the distributor when we are opening an already existing file // #7306: don't invoke the distributor when we are opening an already existing file
@ -105,7 +93,6 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase {
} catch (IllegalStateException ise) { } catch (IllegalStateException ise) {
// expected // expected
} }
assertTrue(DistributorDirectory.assertConsistency(logger, dd));
dd.close(); dd.close();
} }
@ -167,7 +154,6 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase {
// target file already exists // target file already exists
} }
} }
assertTrue(DistributorDirectory.assertConsistency(logger, dd));
IOUtils.close(dd); IOUtils.close(dd);
} }
} }

View File

@ -27,7 +27,6 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase; import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
@ -153,17 +152,11 @@ public class DistributorInTheWildTest extends ThreadedIndexingAndSearchingTestCa
} }
try { try {
FilterDirectory distributorDirectory = new FilterDirectory(new DistributorDirectory(directories)) {
@Override
public void close() throws IOException {
assertTrue(DistributorDirectory.assertConsistency(logger, (DistributorDirectory) this.getDelegate()));
super.close();
}
};
if (random().nextBoolean()) { if (random().nextBoolean()) {
return new MockDirectoryWrapper(random(), distributorDirectory); return new MockDirectoryWrapper(random(), new DistributorDirectory(directories));
} else { } else {
return distributorDirectory; return new DistributorDirectory(directories);
} }
} catch (IOException ex) { } catch (IOException ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);

View File

@ -18,7 +18,6 @@
*/ */
package org.elasticsearch.index.store; package org.elasticsearch.index.store;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.document.*; import org.apache.lucene.document.*;
@ -29,11 +28,13 @@ import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version; import org.apache.lucene.util.Version;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.distributor.Distributor; import org.elasticsearch.index.store.distributor.Distributor;
import org.elasticsearch.index.store.distributor.LeastUsedDistributor; import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.store.distributor.RandomWeightedDistributor; import org.elasticsearch.index.store.distributor.RandomWeightedDistributor;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchLuceneTestCase; import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.junit.Test; import org.junit.Test;
@ -41,6 +42,7 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.nio.file.NoSuchFileException; import java.nio.file.NoSuchFileException;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.Adler32; import java.util.zip.Adler32;
import static com.carrotsearch.randomizedtesting.RandomizedTest.*; import static com.carrotsearch.randomizedtesting.RandomizedTest.*;
@ -52,7 +54,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
public void testRefCount() throws IOException { public void testRefCount() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1); final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random()); DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService)); Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
int incs = randomIntBetween(1, 100); int incs = randomIntBetween(1, 100);
for (int i = 0; i < incs; i++) { for (int i = 0; i < incs; i++) {
if (randomBoolean()) { if (randomBoolean()) {
@ -69,7 +71,14 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
} }
store.incRef(); store.incRef();
store.close(); final AtomicBoolean called = new AtomicBoolean(false);
Store.OnCloseListener listener = new Store.OnCloseListener() {
@Override
public void onClose(ShardId shardId) {
assertTrue(called.compareAndSet(false, true));
}
};
store.close(listener);
for (int i = 0; i < incs; i++) { for (int i = 0; i < incs; i++) {
if (randomBoolean()) { if (randomBoolean()) {
store.incRef(); store.incRef();
@ -84,7 +93,9 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
store.ensureOpen(); store.ensureOpen();
} }
assertFalse(called.get());
store.decRef(); store.decRef();
assertTrue(called.get());
assertFalse(store.tryIncRef()); assertFalse(store.tryIncRef());
try { try {
store.incRef(); store.incRef();
@ -100,6 +111,27 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
} }
} }
@Test
public void testListenerCanThrowException() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
final ShardLock shardLock = new DummyShardLock(shardId);
Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService), shardLock);
final AtomicBoolean called = new AtomicBoolean(false);
Store.OnCloseListener listener = new Store.OnCloseListener() {
@Override
public void onClose(ShardId shardId) {
assertTrue(called.compareAndSet(false, true));
throw new RuntimeException("foobar");
}
};
assertTrue(shardLock.isOpen());
store.close(listener);
assertTrue(called.get());
assertFalse(shardLock.isOpen());
// test will barf if the directory is not closed
}
@Test @Test
public void testVerifyingIndexOutput() throws IOException { public void testVerifyingIndexOutput() throws IOException {
Directory dir = newDirectory(); Directory dir = newDirectory();
@ -160,7 +192,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
public void testWriteLegacyChecksums() throws IOException { public void testWriteLegacyChecksums() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1); final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random()); DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService)); Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
// set default codec - all segments need checksums // set default codec - all segments need checksums
IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec())); IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec()));
int docs = 1 + random().nextInt(100); int docs = 1 + random().nextInt(100);
@ -229,7 +261,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
public void testNewChecksums() throws IOException { public void testNewChecksums() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1); final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random()); DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService)); Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
// set default codec - all segments need checksums // set default codec - all segments need checksums
IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec())); IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec()));
int docs = 1 + random().nextInt(100); int docs = 1 + random().nextInt(100);
@ -289,7 +321,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
public void testMixedChecksums() throws IOException { public void testMixedChecksums() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1); final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random()); DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService)); Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
// this time random codec.... // this time random codec....
IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec())); IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec()));
int docs = 1 + random().nextInt(100); int docs = 1 + random().nextInt(100);
@ -381,7 +413,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
public void testRenameFile() throws IOException { public void testRenameFile() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1); final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random(), false); DirectoryService directoryService = new LuceneManagedDirectoryService(random(), false);
Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService)); Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
{ {
IndexOutput output = store.directory().createOutput("foo.bar", IOContext.DEFAULT); IndexOutput output = store.directory().createOutput("foo.bar", IOContext.DEFAULT);
int iters = scaledRandomIntBetween(10, 100); int iters = scaledRandomIntBetween(10, 100);
@ -600,7 +632,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
} }
} }
private static final class LuceneManagedDirectoryService implements DirectoryService { private static final class LuceneManagedDirectoryService extends DirectoryService {
private final Directory[] dirs; private final Directory[] dirs;
private final Random random; private final Random random;
@ -608,6 +640,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
this(random, true); this(random, true);
} }
public LuceneManagedDirectoryService(Random random, boolean preventDoubleWrite) { public LuceneManagedDirectoryService(Random random, boolean preventDoubleWrite) {
super(new ShardId("fake", 1), ImmutableSettings.EMPTY);
this.dirs = new Directory[1 + random.nextInt(5)]; this.dirs = new Directory[1 + random.nextInt(5)];
for (int i = 0; i < dirs.length; i++) { for (int i = 0; i < dirs.length; i++) {
dirs[i] = newDirectory(random); dirs[i] = newDirectory(random);
@ -669,7 +702,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
iwc.setMaxThreadStates(1); iwc.setMaxThreadStates(1);
final ShardId shardId = new ShardId(new Index("index"), 1); final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random); DirectoryService directoryService = new LuceneManagedDirectoryService(random);
Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(random, directoryService)); Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(random, directoryService), new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc); IndexWriter writer = new IndexWriter(store.directory(), iwc);
final boolean lotsOfSegments = rarely(random); final boolean lotsOfSegments = rarely(random);
for (Document d : docs) { for (Document d : docs) {
@ -700,7 +733,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
iwc.setMaxThreadStates(1); iwc.setMaxThreadStates(1);
final ShardId shardId = new ShardId(new Index("index"), 1); final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random); DirectoryService directoryService = new LuceneManagedDirectoryService(random);
store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(random, directoryService)); store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(random, directoryService), new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc); IndexWriter writer = new IndexWriter(store.directory(), iwc);
final boolean lotsOfSegments = rarely(random); final boolean lotsOfSegments = rarely(random);
for (Document d : docs) { for (Document d : docs) {

View File

@ -20,6 +20,8 @@
package org.elasticsearch.index.store.distributor; package org.elasticsearch.index.store.distributor;
import org.apache.lucene.store.*; import org.apache.lucene.store.*;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test; import org.junit.Test;
@ -29,7 +31,6 @@ import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
/** /**
@ -136,11 +137,12 @@ public class DistributorTests extends ElasticsearchTestCase {
} }
public static class FakeDirectoryService implements DirectoryService { public static class FakeDirectoryService extends DirectoryService {
private final Directory[] directories; private final Directory[] directories;
public FakeDirectoryService(Directory[] directories) { public FakeDirectoryService(Directory[] directories) {
super(new ShardId("fake", 1), ImmutableSettings.EMPTY);
this.directories = directories; this.directories = directories;
} }

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
@ -35,7 +34,6 @@ import java.util.Random;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicIntegerArray;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@ -52,9 +50,8 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
* while the index is being created. * while the index is being created.
*/ */
@Test @Test
@TestLogging("action.search:TRACE,indices.recovery:TRACE,index.shard.service:TRACE")
public void testAutoGenerateIdNoDuplicates() throws Exception { public void testAutoGenerateIdNoDuplicates() throws Exception {
int numberOfIterations = randomIntBetween(10, 50); int numberOfIterations = scaledRandomIntBetween(10, 50);
for (int i = 0; i < numberOfIterations; i++) { for (int i = 0; i < numberOfIterations; i++) {
Throwable firstError = null; Throwable firstError = null;
createIndex("test"); createIndex("test");
@ -65,6 +62,7 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
builders.add(client().prepareIndex("test", "type").setSource("field", "value")); builders.add(client().prepareIndex("test", "type").setSource("field", "value"));
} }
indexRandom(true, builders); indexRandom(true, builders);
ensureYellow("test");
logger.info("verifying indexed content"); logger.info("verifying indexed content");
int numOfChecks = randomIntBetween(8, 12); int numOfChecks = randomIntBetween(8, 12);
for (int j = 0; j < numOfChecks; j++) { for (int j = 0; j < numOfChecks; j++) {

View File

@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.shard.ShardId;
/*
* A ShardLock that does nothing... for tests only
*/
public class DummyShardLock extends ShardLock {
public DummyShardLock(ShardId id) {
super(id);
}
@Override
protected void closeInternal() {
}
}

View File

@ -29,6 +29,8 @@ import com.google.common.collect.*;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.util.AbstractRandomizedTest; import org.apache.lucene.util.AbstractRandomizedTest;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
@ -73,6 +75,7 @@ import org.elasticsearch.index.cache.filter.none.NoneFilterCache;
import org.elasticsearch.index.cache.filter.weighted.WeightedFilterCache; import org.elasticsearch.index.cache.filter.weighted.WeightedFilterCache;
import org.elasticsearch.index.engine.IndexEngineModule; import org.elasticsearch.index.engine.IndexEngineModule;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
@ -99,7 +102,11 @@ import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -114,6 +121,7 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
import static org.elasticsearch.test.ElasticsearchTestCase.assertBusy; import static org.elasticsearch.test.ElasticsearchTestCase.assertBusy;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
@ -1698,4 +1706,19 @@ public final class InternalTestCluster extends TestCluster {
} }
} }
@Override
public void assertAfterTest() throws IOException {
super.assertAfterTest();
for (NodeEnvironment env : this.getInstances(NodeEnvironment.class)) {
Set<ShardId> shardIds = env.lockedShards();
for (ShardId id : shardIds) {
try {
env.shardLock(id, TimeUnit.SECONDS.toMillis(5)).close();
} catch (IOException ex) {
fail("Shard " + id + " is still locked after 5 sec waiting");
}
}
}
}
} }

View File

@ -80,7 +80,7 @@ public abstract class TestCluster implements Iterable<Client>, Closeable {
/** /**
* This method checks all the things that need to be checked after each test * This method checks all the things that need to be checked after each test
*/ */
public void assertAfterTest() { public void assertAfterTest() throws IOException {
assertAllSearchersClosed(); assertAllSearchersClosed();
assertAllFilesClosed(); assertAllFilesClosed();
ensureEstimatedStats(); ensureEstimatedStats();

View File

@ -40,6 +40,7 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.distributor.Distributor;
import org.elasticsearch.index.store.fs.FsDirectoryService; import org.elasticsearch.index.store.fs.FsDirectoryService;
import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
@ -149,6 +150,7 @@ public class MockFSDirectoryService extends FsDirectoryService {
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to check index", e); logger.warn("failed to check index", e);
} finally { } finally {
logger.info("end check index");
store.decRef(); store.decRef();
} }
} }
@ -168,4 +170,9 @@ public class MockFSDirectoryService extends FsDirectoryService {
public long throttleTimeInNanos() { public long throttleTimeInNanos() {
return delegateService.throttleTimeInNanos(); return delegateService.throttleTimeInNanos();
} }
@Override
public Directory newFromDistributor(Distributor distributor) throws IOException {
return helper.wrap(super.newFromDistributor(distributor));
}
} }

View File

@ -22,7 +22,6 @@ package org.elasticsearch.test.store;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -30,7 +29,7 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest;
import java.io.IOException; import java.io.IOException;
import java.util.Random; import java.util.Random;
public class MockRamDirectoryService extends AbstractIndexShardComponent implements DirectoryService { public class MockRamDirectoryService extends DirectoryService {
private final MockDirectoryHelper helper; private final MockDirectoryHelper helper;
private final DirectoryService delegateService; private final DirectoryService delegateService;