From 1c64a113deb70c55e930421469dee8ca0654613d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sun, 16 Nov 2014 14:24:29 +0100 Subject: [PATCH] [CORE] Intorduce shards level locks to prevent concurrent shard modifications Today it's possible that the data directory for a single shard is used by more than on IndexShard->Store instances. While one shard is already closed but has a concurrent recovery running and a new shard is creating it's engine files can conflict and data can potentially be lost. We also remove shards data without checking if there are still users of the files or if files are still open which can cause pending writes / flushes or the delete operation to fail. If the latter is the case the index might be treated as a dangeling index and is brought back to life at a later point in time. This commit introduces a shard level lock that prevents modifications to the shard data while it's still in use. Locks are created per shard and maintined in NodeEnvironment.java. In contrast to most java concurrency primitives those locks are not reentrant. This commit also adds infrastructure that checks if all shard locks are released after tests. --- .../metadata/MetaDataCreateIndexService.java | 1 - .../common/io/FileSystemUtils.java | 1 + .../elasticsearch/env/NodeEnvironment.java | 441 ++++++++++++++---- .../java/org/elasticsearch/env/ShardLock.java | 77 +++ .../state/meta/LocalGatewayMetaState.java | 24 +- .../gateway/none/NoneGateway.java | 6 +- .../gateway/local/LocalIndexShardGateway.java | 2 +- .../index/service/IndexService.java | 1 + .../index/service/InternalIndexService.java | 412 +++++++++------- .../index/store/DirectoryService.java | 23 +- .../index/store/DistributorDirectory.java | 53 ++- .../org/elasticsearch/index/store/Store.java | 87 ++-- .../index/store/StoreModule.java | 6 +- .../index/store/fs/FsDirectoryService.java | 7 +- .../index/store/fs/FsIndexStore.java | 4 +- .../index/store/ram/RamDirectoryService.java | 4 +- .../elasticsearch/indices/IndicesService.java | 34 ++ .../indices/InternalIndicesService.java | 37 +- .../cluster/IndicesClusterStateService.java | 48 +- .../indices/store/IndicesStore.java | 3 +- .../node/internal/InternalNode.java | 10 +- .../env/NodeEnvironmentTests.java | 305 ++++++++++++ .../engine/internal/InternalEngineTests.java | 9 +- .../merge/policy/MergePolicySettingsTest.java | 5 +- .../index/store/CorruptedFileTest.java | 2 +- .../index/store/DistributorDirectoryTest.java | 18 +- .../index/store/DistributorInTheWildTest.java | 13 +- .../elasticsearch/index/store/StoreTest.java | 53 ++- .../store/distributor/DistributorTests.java | 6 +- .../indexing/IndexActionTests.java | 6 +- .../elasticsearch/test/DummyShardLock.java | 37 ++ .../test/InternalTestCluster.java | 23 + .../org/elasticsearch/test/TestCluster.java | 2 +- .../test/store/MockFSDirectoryService.java | 7 + .../test/store/MockRamDirectoryService.java | 3 +- 35 files changed, 1364 insertions(+), 406 deletions(-) create mode 100644 src/main/java/org/elasticsearch/env/ShardLock.java create mode 100644 src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java create mode 100644 src/test/java/org/elasticsearch/test/DummyShardLock.java diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 6747e6a3edc..f20cd603a92 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -53,7 +53,6 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; diff --git a/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java b/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java index 33d9c4e48bc..61ed554f251 100644 --- a/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java +++ b/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java @@ -82,6 +82,7 @@ public class FileSystemUtils { * in the input array using {@link java.io.File#toPath()} * @param files the files to get paths for */ + @Deprecated // this is only a transition API public static Path[] toPaths(File... files) { Path[] paths = new Path[files.length]; for (int i = 0; i < files.length; i++) { diff --git a/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/src/main/java/org/elasticsearch/env/NodeEnvironment.java index ec86d55d6ea..d61c6cbd220 100644 --- a/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -19,79 +19,80 @@ package org.elasticsearch.env; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import org.apache.lucene.store.Lock; +import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import java.io.Closeable; import java.io.File; import java.io.IOException; -import java.nio.file.AtomicMoveNotSupportedException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.nio.file.attribute.FileAttribute; -import java.util.Arrays; -import java.util.Set; +import java.nio.file.*; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; /** - * + * A component that holds all data paths for a single node. */ -public class NodeEnvironment extends AbstractComponent { - - private final File[] nodeFiles; - private final File[] nodeIndicesLocations; +public class NodeEnvironment extends AbstractComponent implements Closeable{ + /* ${data.paths}/nodes/{node.id} */ + private final Path[] nodePaths; + /* ${data.paths}/nodes/{node.id}/indices */ + private final Path[] nodeIndicesPaths; private final Lock[] locks; private final int localNodeId; private final AtomicBoolean closed = new AtomicBoolean(false); + private final Map shardLocks = new HashMap<>(); @Inject - public NodeEnvironment(Settings settings, Environment environment) { + public NodeEnvironment(Settings settings, Environment environment) throws IOException { super(settings); if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) { - nodeFiles = null; - nodeIndicesLocations = null; + nodePaths = null; + nodeIndicesPaths = null; locks = null; localNodeId = -1; return; } - File[] nodesFiles = new File[environment.dataWithClusterFiles().length]; - Lock[] locks = new Lock[environment.dataWithClusterFiles().length]; + final Path[] nodePaths = new Path[environment.dataWithClusterFiles().length]; + final Lock[] locks = new Lock[environment.dataWithClusterFiles().length]; int localNodeId = -1; IOException lastException = null; int maxLocalStorageNodes = settings.getAsInt("node.max_local_storage_nodes", 50); for (int possibleLockId = 0; possibleLockId < maxLocalStorageNodes; possibleLockId++) { for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) { - File dir = new File(new File(environment.dataWithClusterFiles()[dirIndex], "nodes"), Integer.toString(possibleLockId)); - if (!dir.exists()) { - FileSystemUtils.mkdirs(dir); + Path dir = environment.dataWithClusterFiles()[dirIndex].toPath().resolve(Paths.get("nodes", Integer.toString(possibleLockId))); + if (Files.exists(dir) == false) { + Files.createDirectories(dir); } - logger.trace("obtaining node lock on {} ...", dir.getAbsolutePath()); + logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath()); try { - NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir.toPath()); + NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir); Lock tmpLock = lockFactory.makeLock("node.lock"); boolean obtained = tmpLock.obtain(); if (obtained) { locks[dirIndex] = tmpLock; - nodesFiles[dirIndex] = dir; + nodePaths[dirIndex] = dir; localNodeId = possibleLockId; } else { - logger.trace("failed to obtain node lock on {}", dir.getAbsolutePath()); + logger.trace("failed to obtain node lock on {}", dir.toAbsolutePath()); // release all the ones that were obtained up until now for (int i = 0; i < locks.length; i++) { if (locks[i] != null) { @@ -102,8 +103,8 @@ public class NodeEnvironment extends AbstractComponent { break; } } catch (IOException e) { - logger.trace("failed to obtain node lock on {}", e, dir.getAbsolutePath()); - lastException = new IOException("failed to obtain lock on " + dir.getAbsolutePath(), e); + logger.trace("failed to obtain node lock on {}", e, dir.toAbsolutePath()); + lastException = new IOException("failed to obtain lock on " + dir.toAbsolutePath(), e); // release all the ones that were obtained up until now for (int i = 0; i < locks.length; i++) { IOUtils.closeWhileHandlingException(locks[i]); @@ -123,21 +124,205 @@ public class NodeEnvironment extends AbstractComponent { this.localNodeId = localNodeId; this.locks = locks; - this.nodeFiles = nodesFiles; + this.nodePaths = nodePaths; + + if (logger.isDebugEnabled()) { - logger.debug("using node location [{}], local_node_id [{}]", nodesFiles, localNodeId); + logger.debug("using node location [{}], local_node_id [{}]", nodePaths, localNodeId); } if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder("node data locations details:\n"); - for (File file : nodesFiles) { - sb.append(" -> ").append(file.getAbsolutePath()).append(", free_space [").append(new ByteSizeValue(file.getFreeSpace())).append("], usable_space [").append(new ByteSizeValue(file.getUsableSpace())).append("]\n"); + for (Path file : nodePaths) { + sb.append(" -> ").append(file.toAbsolutePath()).append(", free_space [").append(new ByteSizeValue(Files.getFileStore(file).getUnallocatedSpace())).append("], usable_space [").append(new ByteSizeValue(Files.getFileStore(file).getUsableSpace())).append("]\n"); } logger.trace(sb.toString()); } - this.nodeIndicesLocations = new File[nodeFiles.length]; - for (int i = 0; i < nodeFiles.length; i++) { - nodeIndicesLocations[i] = new File(nodeFiles[i], "indices"); + this.nodeIndicesPaths = new Path[nodePaths.length]; + for (int i = 0; i < nodePaths.length; i++) { + nodeIndicesPaths[i] = nodePaths[i].resolve("indices"); + } + } + + + + /** + * Deletes a shard data directory iff the shards locks were successfully acquired. + * + * @param shardId the id of the shard to delete to delete + * @throws IOException if an IOException occurs + */ + public void deleteShardDirectorySafe(ShardId shardId) throws IOException { + final Path[] paths = shardPaths(shardId); + try (Closeable lock = shardLock(shardId)) { + IOUtils.rm(paths); + } + } + + /** + * Deletes an indexes data directory recursively iff all of the indexes + * shards locks were successfully acquired. If any of the indexes shard directories can't be locked + * non of the shards will be deleted + * + * @param index the index to delete + * @throws Exception if any of the shards data directories can't be locked or deleted + */ + public void deleteIndexDirectorySafe(Index index) throws IOException { + final List locks = lockAllForIndex(index); + try { + final Path[] indexPaths = new Path[nodeIndicesPaths.length]; + for (int i = 0; i < indexPaths.length; i++) { + indexPaths[i] = nodeIndicesPaths[i].resolve(index.name()); + } + IOUtils.rm(indexPaths); + } finally { + IOUtils.closeWhileHandlingException(locks); + } + } + + + + /** + * Tries to lock all local shards for the given index. If any of the shard locks can't be acquired + * an {@link LockObtainFailedException} is thrown and all previously acquired locks are released. + * + * @param index the index to lock shards for + * @return the {@link ShardLock} instances for this index. + * @throws IOException if an IOException occurs. + */ + public List lockAllForIndex(Index index) throws IOException { + Set allShardIds = findAllShardIds(index); + List allLocks = new ArrayList<>(); + boolean success = false; + try { + for (ShardId shardId : allShardIds) { + allLocks.add(shardLock(shardId)); + } + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(allLocks); + } + } + return allLocks; + } + + /** + * Tries to lock the given shards ID. A shard lock is required to perform any kind of + * write operation on a shards data directory like deleting files, creating a new index writer + * or recover from a different shard instance into it. If the shard lock can not be acquired + * an {@link LockObtainFailedException} is thrown. + * + * Note: this method will return immediately if the lock can't be acquired. + * + * @param id the shard ID to lock + * @return the shard lock. Call {@link ShardLock#close()} to release the lock + * @throws IOException if an IOException occurs. + */ + public ShardLock shardLock(ShardId id) throws IOException { + return shardLock(id, 0); + } + + /** + * Tries to lock the given shards ID. A shard lock is required to perform any kind of + * write operation on a shards data directory like deleting files, creating a new index writer + * or recover from a different shard instance into it. If the shard lock can not be acquired + * an {@link org.apache.lucene.store.LockObtainFailedException} is thrown + * @param id the shard ID to lock + * @param lockTimeoutMS the lock timeout in milliseconds + * @return the shard lock. Call {@link ShardLock#close()} to release the lock + * @throws IOException if an IOException occurs. + */ + public ShardLock shardLock(final ShardId id, long lockTimeoutMS) throws IOException { + final InternalShardLock shardLock; + final boolean acquired; + synchronized (shardLocks) { + if (shardLocks.containsKey(id)) { + shardLock = shardLocks.get(id); + shardLock.incWaitCount(); + acquired = false; + } else { + shardLock = new InternalShardLock(id); + shardLocks.put(id, shardLock); + acquired = true; + } + } + if (acquired == false) { + boolean success = false; + try { + shardLock.acquire(lockTimeoutMS); + success = true; + } finally { + if (success == false) { + shardLock.decWaitCount(); + } + } + } + return new ShardLock(id) { // new instance prevents double closing + @Override + protected void closeInternal() { + shardLock.release(); + } + }; + } + + /** + * Returns all currently lock shards + */ + public Set lockedShards() { + synchronized (this) { + ImmutableSet.Builder builder = ImmutableSet.builder(); + return builder.addAll(shardLocks.keySet()).build(); + } + } + + private final class InternalShardLock { + /* + * This class holds a mutex for exclusive access and timeout / wait semantics + * and a reference count to cleanup the shard lock instance form the internal data + * structure if nobody is waiting for it. the wait count is guarded by the same lock + * that is used to mutate the map holding the shard locks to ensure exclusive access + */ + private final Semaphore mutex = new Semaphore(1); + private int waitCount = 1; // guarded by shardLocks + private ShardId shardId; + + InternalShardLock(ShardId id) { + shardId = id; + mutex.acquireUninterruptibly(); + } + + protected void release() { + mutex.release(); + decWaitCount(); + } + + void incWaitCount() { + synchronized (shardLocks) { + assert waitCount > 0 : "waitCount is " + waitCount + " but should be > 0"; + waitCount++; + } + } + + private void decWaitCount() { + synchronized (shardLocks) { + assert waitCount > 0 : "waitCount is " + waitCount + " but should be > 0"; + if (--waitCount == 0) { + InternalShardLock remove = shardLocks.remove(shardId); + assert remove != null : "Removed lock was null"; + } + } + } + + void acquire(long timeoutInMillis) throws LockObtainFailedException{ + try { + if (mutex.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS) == false) { + throw new LockObtainFailedException("Can't lock shard " + shardId + ", timed out after " + timeoutInMillis + "ms"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new LockObtainFailedException("Can't lock shard " + shardId + ", interrupted", e); + } } } @@ -146,87 +331,119 @@ public class NodeEnvironment extends AbstractComponent { } public boolean hasNodeFile() { - return nodeFiles != null && locks != null; + return nodePaths != null && locks != null; } - public File[] nodeDataLocations() { + /** + * Returns an array of all of the nodes data locations. + * @throws org.elasticsearch.ElasticsearchIllegalStateException if the node is not configured to store local locations + */ + public Path[] nodeDataPaths() { assert assertEnvIsLocked(); - if (nodeFiles == null || locks == null) { + if (nodePaths == null || locks == null) { throw new ElasticsearchIllegalStateException("node is not configured to store local location"); } - return nodeFiles; + return nodePaths; } - public File[] indicesLocations() { - assert assertEnvIsLocked(); - return nodeIndicesLocations; + /** + * Returns an array of all of the nodes data locations. + * @deprecated use {@link #nodeDataPaths()} instead + */ + @Deprecated + public File[] nodeDataLocations() { + return toFiles(nodeDataPaths()); } + /** + * Returns all data paths for the given index. + * @deprecated use {@link #indexPaths(org.elasticsearch.index.Index)} instead + */ + @Deprecated public File[] indexLocations(Index index) { - assert assertEnvIsLocked(); - File[] indexLocations = new File[nodeFiles.length]; - for (int i = 0; i < nodeFiles.length; i++) { - indexLocations[i] = new File(new File(nodeFiles[i], "indices"), index.name()); - } - return indexLocations; + return toFiles(indexPaths(index)); } + /** + * Returns all data paths for the given shards ID + * @deprecated use {@link #shardPaths(org.elasticsearch.index.shard.ShardId)} instead + */ + @Deprecated public File[] shardLocations(ShardId shardId) { + return toFiles(shardPaths(shardId)); + } + + /** + * Returns all data paths for the given index. + */ + public Path[] indexPaths(Index index) { assert assertEnvIsLocked(); - File[] shardLocations = new File[nodeFiles.length]; - for (int i = 0; i < nodeFiles.length; i++) { - shardLocations[i] = new File(new File(new File(nodeFiles[i], "indices"), shardId.index().name()), Integer.toString(shardId.id())); + Path[] indexPaths = new Path[nodeIndicesPaths.length]; + for (int i = 0; i < nodeIndicesPaths.length; i++) { + indexPaths[i] = nodeIndicesPaths[i].resolve(index.name()); + } + return indexPaths; + } + + /** + * Returns all data paths for the given shards ID + */ + public Path[] shardPaths(ShardId shardId) { + assert assertEnvIsLocked(); + final Path[] nodePaths = nodeDataPaths(); + final Path[] shardLocations = new Path[nodePaths.length]; + for (int i = 0; i < nodePaths.length; i++) { + shardLocations[i] = nodePaths[i].resolve(Paths.get("indices", shardId.index().name(), Integer.toString(shardId.id()))); } return shardLocations; } public Set findAllIndices() throws Exception { - if (nodeFiles == null || locks == null) { + if (nodePaths == null || locks == null) { throw new ElasticsearchIllegalStateException("node is not configured to store local location"); } assert assertEnvIsLocked(); Set indices = Sets.newHashSet(); - for (File indicesLocation : nodeIndicesLocations) { - File[] indicesList = indicesLocation.listFiles(); - if (indicesList == null) { - continue; - } - for (File indexLocation : indicesList) { - if (indexLocation.isDirectory()) { - indices.add(indexLocation.getName()); + for (Path indicesLocation : nodeIndicesPaths) { + + if (Files.exists(indicesLocation) && Files.isDirectory(indicesLocation)) { + try (DirectoryStream stream = Files.newDirectoryStream(indicesLocation)) { + for (Path index : stream) { + if (Files.isDirectory(index)) { + indices.add(index.getFileName().toString()); + } + } } } } return indices; } - public Set findAllShardIds() throws Exception { - if (nodeFiles == null || locks == null) { + /** + * Tries to find all allocated shards for the given index or for all indices iff the given index is null + * on the current node. NOTE: This methods is prone to race-conditions on the filesystem layer since it might not + * see directories created concurrently or while it's traversing. + * @param index the index to filter shards for or null if all shards for all indices should be listed + * @return a set of shard IDs + * @throws IOException if an IOException occurs + */ + public Set findAllShardIds(@Nullable final Index index) throws IOException { + if (nodePaths == null || locks == null) { throw new ElasticsearchIllegalStateException("node is not configured to store local location"); } assert assertEnvIsLocked(); - Set shardIds = Sets.newHashSet(); - for (File indicesLocation : nodeIndicesLocations) { - File[] indicesList = indicesLocation.listFiles(); - if (indicesList == null) { - continue; - } - for (File indexLocation : indicesList) { - if (!indexLocation.isDirectory()) { - continue; - } - String indexName = indexLocation.getName(); - File[] shardsList = indexLocation.listFiles(); - if (shardsList == null) { - continue; - } - for (File shardLocation : shardsList) { - if (!shardLocation.isDirectory()) { - continue; - } - Integer shardId = Ints.tryParse(shardLocation.getName()); - if (shardId != null) { - shardIds.add(new ShardId(indexName, shardId)); + return findAllShardIds(index == null ? null : index.getName(), nodeIndicesPaths); + } + + private static Set findAllShardIds(@Nullable final String index, Path... locations) throws IOException { + final Set shardIds = Sets.newHashSet(); + for (final Path location : locations) { + if (Files.exists(location) && Files.isDirectory(location)) { + try (DirectoryStream indexStream = Files.newDirectoryStream(location)) { + for (Path indexPath : indexStream) { + if (index == null || index.equals(indexPath.getFileName().toString())) { + shardIds.addAll(findAllShardsForIndex(indexPath)); + } } } } @@ -234,6 +451,36 @@ public class NodeEnvironment extends AbstractComponent { return shardIds; } + private static Set findAllShardsForIndex(Path indexPath) throws IOException { + Set shardIds = new HashSet<>(); + if (Files.exists(indexPath) && Files.isDirectory(indexPath)) { + try (DirectoryStream stream = Files.newDirectoryStream(indexPath)) { + String currentIndex = indexPath.getFileName().toString(); + for (Path shardPath : stream) { + if (Files.exists(shardPath) && Files.isDirectory(shardPath)) { + Integer shardId = Ints.tryParse(shardPath.getFileName().toString()); + if (shardId != null) { + shardIds.add(new ShardId(currentIndex, shardId)); + } + } + } + } + } + return shardIds; + } + + /** + * Tries to find all allocated shards for all indices iff the given index on the current node. NOTE: This methods + * is prone to race-conditions on the filesystem layer since it might not see directories created concurrently or + * while it's traversing. + * + * @return a set of shard IDs + * @throws IOException if an IOException occurs + */ + public Set findAllShardIds() throws IOException { + return findAllShardIds(null); + } + public void close() { if (closed.compareAndSet(false, true) && locks != null) { for (Lock lock : locks) { @@ -269,19 +516,37 @@ public class NodeEnvironment extends AbstractComponent { * This method cleans up all files even in the case of an error. */ public void ensureAtomicMoveSupported() throws IOException { - for (File file : nodeFiles) { - assert file.isDirectory(); - final Path src = new File(file, "__es__.tmp").toPath(); + final Path[] nodePaths = nodeDataPaths(); + for (Path directory : nodePaths) { + assert Files.isDirectory(directory) : directory + " is not a directory"; + final Path src = directory.resolve("__es__.tmp"); Files.createFile(src); - final Path target = new File(file, "__es__.final").toPath(); + final Path target = directory.resolve("__es__.final"); try { Files.move(src, target, StandardCopyOption.ATOMIC_MOVE); } catch (AtomicMoveNotSupportedException ex) { - throw new ElasticsearchIllegalStateException("atomic_move is not supported by the filesystem on path [" + file.getCanonicalPath() + "] atomic_move is required for elasticsearch to work correctly.", ex); + throw new ElasticsearchIllegalStateException("atomic_move is not supported by the filesystem on path [" + directory + "] atomic_move is required for elasticsearch to work correctly.", ex); } finally { Files.deleteIfExists(src); Files.deleteIfExists(target); } } } + + + /** + * Returns an array of {@link File} build from the correspondent element + * in the input array using {@link java.nio.file.Path#toFile()} )} + * @param files the files to get paths for + */ + @Deprecated // this is only a transition API + private static File[] toFiles(Path... files) { + File[] paths = new File[files.length]; + for (int i = 0; i < files.length; i++) { + paths[i] = files[i].toFile(); + } + return paths; + } + + } diff --git a/src/main/java/org/elasticsearch/env/ShardLock.java b/src/main/java/org/elasticsearch/env/ShardLock.java new file mode 100644 index 00000000000..13561f89ab0 --- /dev/null +++ b/src/main/java/org/elasticsearch/env/ShardLock.java @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.env; + +import org.apache.lucene.store.Lock; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.index.shard.ShardId; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A shard lock guarantees exclusive access to a shards data + * directory. Internal processes should acquire a lock on a shard + * before executing any write operations on the shards data directory. + * + * @see org.elasticsearch.env.NodeEnvironment + */ +public abstract class ShardLock implements Closeable { + + private final ShardId shardId; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public ShardLock(ShardId id) { + this.shardId = id; + } + + /** + * Returns the locks shards Id. + */ + public final ShardId getShardId() { + return shardId; + } + + @Override + public final void close() throws IOException { + if (this.closed.compareAndSet(false, true)) { + closeInternal(); + } + } + + protected abstract void closeInternal(); + + /** + * Returns true if this lock is still open ie. has not been closed yet. + */ + public final boolean isOpen() { + return closed.get() == false; + } + + @Override + public String toString() { + return "ShardLock{" + + "shardId=" + shardId + + '}'; + } + +} diff --git a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java index 1280a5d0475..81789c8daf5 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java @@ -21,6 +21,7 @@ package org.elasticsearch.gateway.local.state.meta; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; @@ -36,7 +37,6 @@ import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; @@ -246,9 +246,11 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keys()); if (nodeEnv.hasNodeFile()) { try { - IOUtils.rm(FileSystemUtils.toPaths(nodeEnv.indexLocations(new Index(current.index())))); + nodeEnv.deleteIndexDirectorySafe(new Index(current.index())); + } catch (LockObtainFailedException ex) { + logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, current.index()); } catch (Exception ex) { - logger.debug("[{}] failed to delete index", ex, current.index()); + logger.warn("[{}] failed to delete index", ex, current.index()); } } try { @@ -282,16 +284,24 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS // already dangling, continue continue; } - IndexMetaData indexMetaData = loadIndexState(indexName); + final IndexMetaData indexMetaData = loadIndexState(indexName); if (indexMetaData != null) { if (danglingTimeout.millis() == 0) { logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName); try { - IOUtils.rm(FileSystemUtils.toPaths(nodeEnv.indexLocations(new Index(indexName)))); + nodeEnv.deleteIndexDirectorySafe(new Index(indexName)); + } catch (LockObtainFailedException ex) { + logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, indexName); } catch (Exception ex) { - logger.debug("[{}] failed to delete dangling index", ex, indexName); + logger.warn("[{}] failed to delete dangling index", ex, indexName); } } else { + try { // the index deletion might not have worked due to shards still being locked + IOUtils.closeWhileHandlingException(nodeEnv.lockAllForIndex(new Index(indexName))); + } catch (IOException ex) { + logger.warn("[{}] skipping locked dangling index, exists on local file system, but not in cluster metadata, auto import to cluster state is set to [{}]", ex, indexName, autoImportDangled); + continue; + } logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled); danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName)))); } @@ -591,7 +601,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS } logger.warn("[{}] deleting dangling index", index); try { - IOUtils.rm(FileSystemUtils.toPaths(nodeEnv.indexLocations(new Index(index)))); + nodeEnv.deleteIndexDirectorySafe(new Index(index)); } catch (Exception ex) { logger.debug("failed to delete dangling index", ex); } diff --git a/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java b/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java index 7ecbc05e9e3..07c208f7c14 100644 --- a/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java +++ b/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java @@ -19,7 +19,6 @@ package org.elasticsearch.gateway.none; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; @@ -29,7 +28,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; @@ -44,7 +42,6 @@ public class NoneGateway extends AbstractLifecycleComponent implements public static final String TYPE = "none"; - private final ClusterService clusterService; private final NodeEnvironment nodeEnv; private final NodeIndexDeletedAction nodeIndexDeletedAction; private final ClusterName clusterName; @@ -55,7 +52,6 @@ public class NoneGateway extends AbstractLifecycleComponent implements @Inject public NoneGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, NodeIndexDeletedAction nodeIndexDeletedAction, ClusterName clusterName) { super(settings); - this.clusterService = clusterService; this.nodeEnv = nodeEnv; this.nodeIndexDeletedAction = nodeIndexDeletedAction; this.clusterName = clusterName; @@ -119,7 +115,7 @@ public class NoneGateway extends AbstractLifecycleComponent implements logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keys()); if (nodeEnv.hasNodeFile()) { try { - IOUtils.rm(FileSystemUtils.toPaths(nodeEnv.indexLocations(new Index(current.index())))); + nodeEnv.deleteIndexDirectorySafe(new Index(current.index())); } catch (Exception ex) { logger.debug("failed to delete shard locations", ex); } diff --git a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index 25a09186fb5..89473ee281a 100644 --- a/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -292,7 +292,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen indexShard.performRecoveryFinalization(true); try { - Files.delete(recoveringTranslogFile.toPath()); + Files.deleteIfExists(recoveringTranslogFile.toPath()); } catch (Exception ex) { logger.debug("Failed to delete recovering translog file {}", ex, recoveringTranslogFile); } diff --git a/src/main/java/org/elasticsearch/index/service/IndexService.java b/src/main/java/org/elasticsearch/index/service/IndexService.java index 0e75857a5ef..4369b42bd93 100644 --- a/src/main/java/org/elasticsearch/index/service/IndexService.java +++ b/src/main/java/org/elasticsearch/index/service/IndexService.java @@ -106,4 +106,5 @@ public interface IndexService extends IndexComponent, Iterable { Injector shardInjectorSafe(int shardId) throws IndexShardMissingException; String indexUUID(); + } diff --git a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index b4d71f618b4..3bbe5e3cc4e 100644 --- a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -21,14 +21,19 @@ package org.elasticsearch.index.service; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import com.google.common.collect.UnmodifiableIterator; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.*; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.*; import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.analysis.AnalysisService; @@ -75,15 +80,22 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogModule; import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.indices.IndicesLifecycle; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.ShardsPluginsModule; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.collect.Maps.newHashMap; import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; @@ -127,18 +139,20 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde private final IndexSettingsService settingsService; + private final NodeEnvironment nodeEnv; + private volatile ImmutableMap shardsInjectors = ImmutableMap.of(); private volatile ImmutableMap shards = ImmutableMap.of(); - private volatile boolean closed = false; + private final AtomicBoolean closed = new AtomicBoolean(false); @Inject public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool, AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore, IndexSettingsService settingsService, IndexFieldDataService indexFieldData, - BitsetFilterCache bitSetFilterCache) { + BitsetFilterCache bitSetFilterCache ) { super(index, indexSettings); this.injector = injector; this.threadPool = threadPool; @@ -163,6 +177,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde indexCache.filter().setIndexService(this); indexFieldData.setIndexService(this); bitSetFilterCache.setIndexService(this); + this.nodeEnv = nodeEnv; } @Override @@ -264,33 +279,37 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde return indexEngine; } - public void close(final String reason, @Nullable Executor executor) { - synchronized (this) { - closed = true; - } - Set shardIds = shardIds(); - final CountDownLatch latch = new CountDownLatch(shardIds.size()); - for (final int shardId : shardIds) { - executor = executor == null ? threadPool.generic() : executor; - executor.execute(new Runnable() { - @Override - public void run() { - try { - removeShard(shardId, reason); - } catch (Throwable e) { - logger.warn("failed to close shard", e); - } finally { - latch.countDown(); - } - } - }); - } - try { - latch.await(); - } catch (InterruptedException e) { - logger.debug("Interrupted closing index [{}]", e, index().name()); - Thread.currentThread().interrupt(); - } + public void close(final String reason, @Nullable Executor executor, final IndicesService.IndexCloseListener listener) { + if (closed.compareAndSet(false, true)) { + final Set shardIds = shardIds(); + final CountDownLatch latch = new CountDownLatch(shardIds.size()); + final IndicesService.IndexCloseListener innerListener = listener == null ? null : + new PerShardIndexCloseListener(shardIds, listener); + for (final int shardId : shardIds) { + executor = executor == null ? threadPool.generic() : executor; + executor.execute(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + logger.warn("failed to close shard", t); + } + + @Override + public void doRun() { + try { + removeShard(shardId, reason, innerListener); + } finally { + latch.countDown(); + } + } + }); + } + try { + latch.await(); + } catch (InterruptedException e) { + logger.debug("Interrupted closing index [{}]", e, index().name()); + Thread.currentThread().interrupt(); + } + } } @Override @@ -319,153 +338,226 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde * be able to serialize the execution via the cluster state in the future. for now we just * keep it synced. */ - if (closed) { + if (closed.get()) { throw new ElasticsearchIllegalStateException("Can't create shard [" + index.name() + "][" + sShardId + "], closed"); } ShardId shardId = new ShardId(index, sShardId); - if (shardsInjectors.containsKey(shardId.id())) { - throw new IndexShardAlreadyExistsException(shardId + " already exists"); - } - - indicesLifecycle.beforeIndexShardCreated(shardId); - - logger.debug("creating shard_id [{}]", shardId.id()); - - ModulesBuilder modules = new ModulesBuilder(); - modules.add(new ShardsPluginsModule(indexSettings, pluginsService)); - modules.add(new IndexShardModule(indexSettings, shardId)); - modules.add(new ShardIndexingModule()); - modules.add(new ShardSearchModule()); - modules.add(new ShardGetModule()); - modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class))); - modules.add(new DeletionPolicyModule(indexSettings)); - modules.add(new MergePolicyModule(indexSettings)); - modules.add(new MergeSchedulerModule(indexSettings)); - modules.add(new ShardFilterCacheModule()); - modules.add(new ShardQueryCacheModule()); - modules.add(new ShardBitsetFilterCacheModule()); - modules.add(new ShardFieldDataModule()); - modules.add(new TranslogModule(indexSettings)); - modules.add(new EngineModule(indexSettings)); - modules.add(new IndexShardGatewayModule(injector.getInstance(IndexGateway.class))); - modules.add(new PercolatorShardModule()); - modules.add(new ShardTermVectorModule()); - modules.add(new IndexShardSnapshotModule()); - modules.add(new SuggestShardModule()); - - Injector shardInjector; + ShardLock lock = null; + boolean success = false; try { - shardInjector = modules.createChildInjector(injector); - } catch (CreationException e) { - throw new IndexShardCreationException(shardId, Injectors.getFirstErrorFailure(e)); - } catch (Throwable e) { - throw new IndexShardCreationException(shardId, e); + lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5)); + if (shardsInjectors.containsKey(shardId.id())) { + throw new IndexShardAlreadyExistsException(shardId + " already exists"); + } + + indicesLifecycle.beforeIndexShardCreated(shardId); + + logger.debug("creating shard_id {}", shardId); + + ModulesBuilder modules = new ModulesBuilder(); + modules.add(new ShardsPluginsModule(indexSettings, pluginsService)); + modules.add(new IndexShardModule(indexSettings, shardId)); + modules.add(new ShardIndexingModule()); + modules.add(new ShardSearchModule()); + modules.add(new ShardGetModule()); + modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class), lock)); + modules.add(new DeletionPolicyModule(indexSettings)); + modules.add(new MergePolicyModule(indexSettings)); + modules.add(new MergeSchedulerModule(indexSettings)); + modules.add(new ShardFilterCacheModule()); + modules.add(new ShardQueryCacheModule()); + modules.add(new ShardBitsetFilterCacheModule()); + modules.add(new ShardFieldDataModule()); + modules.add(new TranslogModule(indexSettings)); + modules.add(new EngineModule(indexSettings)); + modules.add(new IndexShardGatewayModule(injector.getInstance(IndexGateway.class))); + modules.add(new PercolatorShardModule()); + modules.add(new ShardTermVectorModule()); + modules.add(new IndexShardSnapshotModule()); + modules.add(new SuggestShardModule()); + + Injector shardInjector; + try { + shardInjector = modules.createChildInjector(injector); + } catch (CreationException e) { + throw new IndexShardCreationException(shardId, Injectors.getFirstErrorFailure(e)); + } catch (Throwable e) { + throw new IndexShardCreationException(shardId, e); + } + + shardsInjectors = newMapBuilder(shardsInjectors).put(shardId.id(), shardInjector).immutableMap(); + + IndexShard indexShard = shardInjector.getInstance(IndexShard.class); + indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created"); + indicesLifecycle.afterIndexShardCreated(indexShard); + + shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); + success = true; + return indexShard; + } catch (IOException ex) { + throw new IndexShardCreationException(shardId, ex); + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(lock); + } } - - shardsInjectors = newMapBuilder(shardsInjectors).put(shardId.id(), shardInjector).immutableMap(); - - IndexShard indexShard = shardInjector.getInstance(IndexShard.class); - - indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created"); - indicesLifecycle.afterIndexShardCreated(indexShard); - - shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); - - return indexShard; } @Override - public synchronized void removeShard(int shardId, String reason) throws ElasticsearchException { - final Injector shardInjector; - final IndexShard indexShard; - final ShardId sId = new ShardId(index, shardId); - Map tmpShardInjectors = newHashMap(shardsInjectors); - shardInjector = tmpShardInjectors.remove(shardId); - if (shardInjector == null) { - return; - } + public void removeShard(int shardId, String reason) throws ElasticsearchException { + removeShard(shardId, reason, null); + } - logger.debug("[{}] closing... (reason: [{}])", shardId, reason); - shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors); - Map tmpShardsMap = newHashMap(shards); - indexShard = tmpShardsMap.remove(shardId); - shards = ImmutableMap.copyOf(tmpShardsMap); - indicesLifecycle.beforeIndexShardClosed(sId, indexShard); - for (Class closeable : pluginsService.shardServices()) { - try { - shardInjector.getInstance(closeable).close(); - } catch (Throwable e) { - logger.debug("[{}] failed to clean plugin shard service [{}]", e, shardId, closeable); - } - } + public synchronized void removeShard(int shardId, String reason, @Nullable final IndicesService.IndexCloseListener listener) throws ElasticsearchException { + boolean listenerPassed = false; + final ShardId sId = new ShardId(index, shardId); try { - // now we can close the translog service, we need to close it before the we close the shard - shardInjector.getInstance(TranslogService.class).close(); - } catch (Throwable e) { - logger.debug("[{}] failed to close translog service", e, shardId); - // ignore - } - // this logic is tricky, we want to close the engine so we rollback the changes done to it - // and close the shard so no operations are allowed to it - if (indexShard != null) { + final Injector shardInjector; + final IndexShard indexShard; + Map tmpShardInjectors = newHashMap(shardsInjectors); + shardInjector = tmpShardInjectors.remove(shardId); + if (shardInjector == null) { + return; + } + + logger.debug("[{}] closing... (reason: [{}])", shardId, reason); + shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors); + Map tmpShardsMap = newHashMap(shards); + indexShard = tmpShardsMap.remove(shardId); + shards = ImmutableMap.copyOf(tmpShardsMap); + indicesLifecycle.beforeIndexShardClosed(sId, indexShard); + for (Class closeable : pluginsService.shardServices()) { + try { + shardInjector.getInstance(closeable).close(); + } catch (Throwable e) { + logger.debug("[{}] failed to clean plugin shard service [{}]", e, shardId, closeable); + } + } try { - ((InternalIndexShard) indexShard).close(reason); + // now we can close the translog service, we need to close it before the we close the shard + shardInjector.getInstance(TranslogService.class).close(); } catch (Throwable e) { - logger.debug("[{}] failed to close index shard", e, shardId); + logger.debug("[{}] failed to close translog service", e, shardId); // ignore } - } - try { - shardInjector.getInstance(Engine.class).close(); - } catch (Throwable e) { - logger.debug("[{}] failed to close engine", e, shardId); - // ignore - } - try { - shardInjector.getInstance(MergeSchedulerProvider.class).close(); - } catch (Throwable e) { - logger.debug("[{}] failed to close merge policy scheduler", e, shardId); - // ignore - } - try { - shardInjector.getInstance(MergePolicyProvider.class).close(); - } catch (Throwable e) { - logger.debug("[{}] failed to close merge policy provider", e, shardId); - // ignore - } - try { - shardInjector.getInstance(IndexShardGatewayService.class).close(); - } catch (Throwable e) { - logger.debug("[{}] failed to close index shard gateway", e, shardId); - // ignore - } - try { - // now we can close the translog - shardInjector.getInstance(Translog.class).close(); - } catch (Throwable e) { - logger.debug("[{}] failed to close translog", e, shardId); - // ignore - } - try { - // now we can close the translog - shardInjector.getInstance(PercolatorQueriesRegistry.class).close(); - } catch (Throwable e) { - logger.debug("[{}] failed to close PercolatorQueriesRegistry", e, shardId); - // ignore + // this logic is tricky, we want to close the engine so we rollback the changes done to it + // and close the shard so no operations are allowed to it + if (indexShard != null) { + try { + ((InternalIndexShard) indexShard).close(reason); + } catch (Throwable e) { + logger.debug("[{}] failed to close index shard", e, shardId); + // ignore + } + } + try { + shardInjector.getInstance(Engine.class).close(); + } catch (Throwable e) { + logger.debug("[{}] failed to close engine", e, shardId); + // ignore + } + try { + shardInjector.getInstance(MergeSchedulerProvider.class).close(); + } catch (Throwable e) { + logger.debug("[{}] failed to close merge policy scheduler", e, shardId); + // ignore + } + try { + shardInjector.getInstance(MergePolicyProvider.class).close(); + } catch (Throwable e) { + logger.debug("[{}] failed to close merge policy provider", e, shardId); + // ignore + } + try { + shardInjector.getInstance(IndexShardGatewayService.class).close(); + } catch (Throwable e) { + logger.debug("[{}] failed to close index shard gateway", e, shardId); + // ignore + } + try { + // now we can close the translog + shardInjector.getInstance(Translog.class).close(); + } catch (Throwable e) { + logger.debug("[{}] failed to close translog", e, shardId); + // ignore + } + try { + // now we can close the translog + shardInjector.getInstance(PercolatorQueriesRegistry.class).close(); + } catch (Throwable e) { + logger.debug("[{}] failed to close PercolatorQueriesRegistry", e, shardId); + // ignore + } + + // call this before we close the store, so we can release resources for it + indicesLifecycle.afterIndexShardClosed(sId, indexShard); + // if we delete or have no gateway or the store is not persistent, clean the store... + final Store store = shardInjector.getInstance(Store.class); + // and close it + try { + listenerPassed = true; + if (listener == null) { + store.close(); + } else { + store.close(new Store.OnCloseListener() { + @Override + public void onClose(ShardId shardId) { + listener.onShardClosed(shardId); + } + }); + } + } catch (Throwable e) { + logger.warn("[{}] failed to close store on shard deletion", e, shardId); + } + Injectors.close(injector); + + logger.debug("[{}] closed (reason: [{}])", shardId, reason); + } catch (Throwable t) { + if (listenerPassed == false && listener != null) { // only notify if the listener wasn't passed to the store + listener.onShardCloseFailed(sId, t); + } + throw t; } - // call this before we close the store, so we can release resources for it - indicesLifecycle.afterIndexShardClosed(sId, indexShard); - // if we delete or have no gateway or the store is not persistent, clean the store... - Store store = shardInjector.getInstance(Store.class); - // and close it - try { - store.close(); - } catch (Throwable e) { - logger.warn("[{}] failed to close store on shard deletion", e, shardId); - } - Injectors.close(injector); + } - logger.debug("[{}] closed (reason: [{}])", shardId, reason); + private static final class PerShardIndexCloseListener implements IndicesService.IndexCloseListener { + final CountDown countDown; + final List failures; + private final Set shardIds; + private final IndicesService.IndexCloseListener listener; + + public PerShardIndexCloseListener(Set shardIds, IndicesService.IndexCloseListener listener) { + this.shardIds = shardIds; + this.listener = listener; + countDown = new CountDown(shardIds.size()); + failures = new CopyOnWriteArrayList<>(); + } + + @Override + public void onAllShardsClosed(Index index, List failures) { + assert false : "nobody should call this"; + } + + @Override + public void onShardClosed(ShardId shardId) { + assert countDown.isCountedDown() == false; + assert shardIds.contains(shardId.getId()) : "Unknown shard id"; + listener.onShardClosed(shardId); + if (countDown.countDown()) { + listener.onAllShardsClosed(shardId.index(), failures); + } + } + + @Override + public void onShardCloseFailed(ShardId shardId, Throwable t) { + assert countDown.isCountedDown() == false; + assert shardIds.contains(shardId.getId()) : "Unkown shard id"; + listener.onShardCloseFailed(shardId, t); + failures.add(t); + if (countDown.countDown()) { + listener.onAllShardsClosed(shardId.index(), failures); + } + } } } diff --git a/src/main/java/org/elasticsearch/index/store/DirectoryService.java b/src/main/java/org/elasticsearch/index/store/DirectoryService.java index 3291377835e..7278ecf129f 100644 --- a/src/main/java/org/elasticsearch/index/store/DirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/DirectoryService.java @@ -20,14 +20,31 @@ package org.elasticsearch.index.store; import org.apache.lucene.store.Directory; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.distributor.Distributor; import java.io.IOException; /** */ -public interface DirectoryService { +public abstract class DirectoryService extends AbstractIndexShardComponent { - Directory[] build() throws IOException; + protected DirectoryService(ShardId shardId, @IndexSettings Settings indexSettings) { + super(shardId, indexSettings); + } - long throttleTimeInNanos(); + public abstract Directory[] build() throws IOException; + + public abstract long throttleTimeInNanos(); + + /** + * Creates a new Directory from the given distributor. + * The default implementation returns a new {@link org.elasticsearch.index.store.DistributorDirectory} + */ + public Directory newFromDistributor(Distributor distributor) throws IOException { + return new DistributorDirectory(distributor); + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/index/store/DistributorDirectory.java b/src/main/java/org/elasticsearch/index/store/DistributorDirectory.java index 136fa4e1508..b4155cabd5a 100644 --- a/src/main/java/org/elasticsearch/index/store/DistributorDirectory.java +++ b/src/main/java/org/elasticsearch/index/store/DistributorDirectory.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.store; import org.apache.lucene.store.*; import org.apache.lucene.util.IOUtils; -import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.math.MathUtils; import org.elasticsearch.index.store.distributor.Distributor; @@ -128,7 +127,12 @@ public final class DistributorDirectory extends BaseDirectory { @Override public synchronized void close() throws IOException { - IOUtils.close(distributor.all()); + try { + assert assertConsistency(); + } finally { + IOUtils.close(distributor.all()); + } + } /** @@ -183,34 +187,29 @@ public final class DistributorDirectory extends BaseDirectory { /** * Basic checks to ensure the internal mapping is consistent - should only be used in assertions */ - static boolean assertConsistency(ESLogger logger, DistributorDirectory dir) throws IOException { - synchronized (dir) { - boolean consistent = true; - StringBuilder builder = new StringBuilder(); - Directory[] all = dir.distributor.all(); - for (Directory d : all) { - for (String file : d.listAll()) { - final Directory directory = dir.nameDirMapping.get(file); - if (directory == null) { - consistent = false; - builder.append("File ").append(file) - .append(" was not mapped to a directory but exists in one of the distributors directories") - .append(System.lineSeparator()); - } else if (directory != d) { - consistent = false; - builder.append("File ").append(file).append(" was mapped to a directory ").append(directory) - .append(" but exists in another distributor directory").append(d) - .append(System.lineSeparator()); - } - + private synchronized boolean assertConsistency() throws IOException { + boolean consistent = true; + StringBuilder builder = new StringBuilder(); + Directory[] all = distributor.all(); + for (Directory d : all) { + for (String file : d.listAll()) { + final Directory directory = nameDirMapping.get(file); + if (directory == null) { + consistent = false; + builder.append("File ").append(file) + .append(" was not mapped to a directory but exists in one of the distributors directories") + .append(System.lineSeparator()); + } else if (directory != d) { + consistent = false; + builder.append("File ").append(file).append(" was mapped to a directory ").append(directory) + .append(" but exists in another distributor directory").append(d) + .append(System.lineSeparator()); } + } - if (consistent == false) { - logger.info(builder.toString()); - } - assert consistent : builder.toString(); - return consistent; // return boolean so it can be easily be used in asserts } + assert consistent : builder.toString(); + return consistent; // return boolean so it can be easily be used in asserts } /** diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index aa1e4f4af59..b0bfde80ad5 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -33,8 +33,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.compress.Compressor; -import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.logging.ESLogger; @@ -44,6 +42,7 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.RefCounted; +import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.CloseableIndexComponent; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.settings.IndexSettings; @@ -92,8 +91,8 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex private final CodecService codecService; private final DirectoryService directoryService; private final StoreDirectory directory; - private final DistributorDirectory distributorDirectory; private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock(); + private final ShardLock shardLock; private final AbstractRefCounted refCounter = new AbstractRefCounted("store") { @Override @@ -102,17 +101,19 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex Store.this.closeInternal(); } }; + private volatile OnCloseListener onClose; @Inject - public Store(ShardId shardId, @IndexSettings Settings indexSettings, CodecService codecService, DirectoryService directoryService, Distributor distributor) throws IOException { + public Store(ShardId shardId, @IndexSettings Settings indexSettings, CodecService codecService, DirectoryService directoryService, Distributor distributor, ShardLock shardLock) throws IOException { super(shardId, indexSettings); this.codecService = codecService; this.directoryService = directoryService; - this.distributorDirectory = new DistributorDirectory(distributor); - this.directory = new StoreDirectory(distributorDirectory); + this.directory = new StoreDirectory(directoryService.newFromDistributor(distributor)); + this.shardLock = shardLock; + assert shardLock != null; + assert shardLock.getShardId().equals(shardId); } - public Directory directory() { ensureOpen(); return directory; @@ -195,7 +196,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex failIfCorrupted(); metadataLock.readLock().lock(); try { - return new MetadataSnapshot(commit, distributorDirectory, logger); + return new MetadataSnapshot(commit, directory, logger); } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { markStoreCorrupted(ex); throw ex; @@ -258,20 +259,18 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex */ public void deleteContent() throws IOException { ensureOpen(); - final String[] files = distributorDirectory.listAll(); - IOException lastException = null; + final String[] files = directory.listAll(); + final List exceptions = new ArrayList<>(); for (String file : files) { try { - distributorDirectory.deleteFile(file); + directory.deleteFile(file); } catch (NoSuchFileException | FileNotFoundException e) { // ignore } catch (IOException e) { - lastException = e; + exceptions.add(e); } } - if (lastException != null) { - throw lastException; - } + ExceptionsHelper.rethrowAndSuppress(exceptions); } public StoreStats stats() throws IOException { @@ -281,7 +280,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex public void renameFile(String from, String to) throws IOException { ensureOpen(); - distributorDirectory.renameFile(from, to); + directory.renameFile(from, to); } /** @@ -305,7 +304,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex */ @Override public final void incRef() { - refCounter.incRef(); + refCounter.incRef(); } /** @@ -322,7 +321,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex */ @Override public final boolean tryIncRef() { - return refCounter.tryIncRef(); + return refCounter.tryIncRef(); } /** @@ -332,22 +331,48 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex */ @Override public final void decRef() { - refCounter.decRef(); + refCounter.decRef(); } @Override public void close() { + close(null); + } + + /** + * Closes this store and installs the given {@link org.elasticsearch.index.store.Store.OnCloseListener} + * to be notified once all references to this store are released and the store is closed. + */ + public void close(@Nullable OnCloseListener onClose) { if (isClosed.compareAndSet(false, true)) { + assert this.onClose == null : "OnClose listener is already set"; + this.onClose = onClose; // only do this once! decRef(); } } private void closeInternal() { + final OnCloseListener listener = onClose; + onClose = null; try { directory.innerClose(); // this closes the distributorDirectory as well } catch (IOException e) { logger.debug("failed to close directory", e); + } finally { + try { + IOUtils.closeWhileHandlingException(shardLock); + } finally { + try { + if (listener != null) { + listener.onClose(shardId); + } + } catch (Exception ex){ + logger.debug("OnCloseListener threw an exception", ex); + } + } + + } } @@ -534,12 +559,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex } private void innerClose() throws IOException { - try { - assert DistributorDirectory.assertConsistency(logger, distributorDirectory); - } finally { - super.close(); - } - + super.close(); } @Override @@ -837,8 +857,8 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex } public synchronized void write(Store store) throws IOException { - synchronized (store.distributorDirectory) { - Tuple, Long> tuple = MetadataSnapshot.readLegacyChecksums(store.distributorDirectory); + synchronized (store.directory) { + Tuple, Long> tuple = MetadataSnapshot.readLegacyChecksums(store.directory); tuple.v1().putAll(legacyChecksums); if (!tuple.v1().isEmpty()) { writeChecksums(store.directory, tuple.v1(), tuple.v2()); @@ -1101,4 +1121,17 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex directory().sync(Collections.singleton(uuid)); } } + + /** + * A listener that is called once this store is closed and all references are released + */ + public static interface OnCloseListener { + + /** + * Called once the store is closed and all references are released. + * + * @param shardId the shard ID the calling store belongs to. + */ + public void onClose(ShardId shardId); + } } diff --git a/src/main/java/org/elasticsearch/index/store/StoreModule.java b/src/main/java/org/elasticsearch/index/store/StoreModule.java index c0b024249e9..78c7e532715 100644 --- a/src/main/java/org/elasticsearch/index/store/StoreModule.java +++ b/src/main/java/org/elasticsearch/index/store/StoreModule.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.store; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.store.distributor.Distributor; import org.elasticsearch.index.store.distributor.LeastUsedDistributor; import org.elasticsearch.index.store.distributor.RandomWeightedDistributor; @@ -37,12 +38,14 @@ public class StoreModule extends AbstractModule { private final Settings settings; private final IndexStore indexStore; + private final ShardLock lock; private Class distributor; - public StoreModule(Settings settings, IndexStore indexStore) { + public StoreModule(Settings settings, IndexStore indexStore, ShardLock lock) { this.indexStore = indexStore; this.settings = settings; + this.lock = lock; } public void setDistributor(Class distributor) { @@ -53,6 +56,7 @@ public class StoreModule extends AbstractModule { protected void configure() { bind(DirectoryService.class).to(indexStore.shardDirectory()).asEagerSingleton(); bind(Store.class).asEagerSingleton(); + bind(ShardLock.class).toInstance(lock); if (distributor == null) { distributor = loadDistributor(settings); } diff --git a/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java b/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java index 45cee0743e3..7d9c19e408c 100644 --- a/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java @@ -20,25 +20,20 @@ package org.elasticsearch.index.store.fs; import org.apache.lucene.store.*; -import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.DirectoryService; -import org.elasticsearch.index.store.DirectoryUtils; import org.elasticsearch.index.store.IndexStore; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InterruptedIOException; /** */ -public abstract class FsDirectoryService extends AbstractIndexShardComponent implements DirectoryService, StoreRateLimiting.Listener, StoreRateLimiting.Provider { +public abstract class FsDirectoryService extends DirectoryService implements StoreRateLimiting.Listener, StoreRateLimiting.Provider { protected final FsIndexStore indexStore; diff --git a/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java b/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java index e4e6bb53408..83ca80cd402 100644 --- a/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java @@ -19,9 +19,7 @@ package org.elasticsearch.index.store.fs; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; @@ -83,7 +81,7 @@ public abstract class FsIndexStore extends AbstractIndexStore { throw new ElasticsearchIllegalStateException(shardId + " allocated, can't be deleted"); } try { - IOUtils.rm(FileSystemUtils.toPaths(shardLocations(shardId))); + nodeEnv.deleteShardDirectorySafe(shardId); } catch (Exception ex) { logger.debug("failed to delete shard locations", ex); } diff --git a/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java b/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java index 2eb77e1943e..56a76f38fff 100644 --- a/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java @@ -25,17 +25,15 @@ import org.apache.lucene.store.RAMFile; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.DirectoryService; -import org.elasticsearch.index.store.DirectoryUtils; import java.io.FileNotFoundException; import java.io.IOException; /** */ -public final class RamDirectoryService extends AbstractIndexShardComponent implements DirectoryService { +public final class RamDirectoryService extends DirectoryService { @Inject public RamDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings) { diff --git a/src/main/java/org/elasticsearch/indices/IndicesService.java b/src/main/java/org/elasticsearch/indices/IndicesService.java index 955519d2351..8e633668563 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -25,7 +25,12 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; + +import java.util.List; /** * @@ -74,4 +79,33 @@ public interface IndicesService extends Iterable, LifecycleCompone IndexService createIndex(String index, Settings settings, String localNodeId) throws ElasticsearchException; void removeIndex(String index, String reason) throws ElasticsearchException; + + void removeIndex(String index, String reason, @Nullable IndexCloseListener listener) throws ElasticsearchException; + + /** + * A listener interface that can be used to get notification once a shard or all shards + * of an certain index that are allocated on a node are actually closed. The listener methods + * are invoked once the actual low level instance modifying or reading a shard are closed in contrast to + * removal methods that might return earlier. + */ + public static interface IndexCloseListener { + + /** + * Invoked once all shards are closed or their closing failed. + * @param index the index that got closed + * @param failures the recorded shard closing failures + */ + public void onAllShardsClosed(Index index, List failures); + + /** + * Invoked once the last resource using the given shard ID is released + */ + public void onShardClosed(ShardId shardId); + + /** + * Invoked if closing the given shard failed. + */ + public void onShardCloseFailed(ShardId shardId, Throwable t); + + } } diff --git a/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index 2cb72e6ed04..5e0ea93e94e 100644 --- a/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -65,6 +65,7 @@ import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStoreModule; +import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.analysis.IndicesAnalysisService; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; @@ -74,10 +75,8 @@ import org.elasticsearch.plugins.PluginsService; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.collect.Maps.newHashMap; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; @@ -136,17 +135,27 @@ public class InternalIndicesService extends AbstractLifecycleComponent failures) { + latch.countDown(); + } + @Override + public void onShardClosed(ShardId shardId) {} + @Override + public void onShardCloseFailed(ShardId shardId, Throwable t) {} + }); } catch (Throwable e) { - logger.warn("failed to delete index on stop [" + index + "]", e); - } finally { latch.countDown(); + logger.warn("failed to delete index on stop [" + index + "]", e); } } }); } try { - latch.await(); + if (latch.await(30, TimeUnit.SECONDS) == false) { + logger.warn("Not all shards are closed yet, waited 30sec - stopping service"); + } } catch (InterruptedException e) { // ignore } finally { @@ -316,10 +325,15 @@ public class InternalIndicesService extends AbstractLifecycleComponent failedShards = ConcurrentCollections.newConcurrentMap(); + private final NodeEnvironment nodeEnvironment; static class FailedShard { public final long version; @@ -119,7 +120,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent failures) { + try { + nodeEnvironment.deleteIndexDirectorySafe(index); + logger.debug("deleted index [{}] from filesystem", index); + } catch (Exception e) { + logger.debug("failed to deleted index [{}] from filesystem", e, index); + // ignore - still some shards locked here + } + } + + @Override + public void onShardClosed(ShardId shardId) { + try { + nodeEnvironment.deleteShardDirectorySafe(shardId); + logger.debug("deleted shard [{}] from filesystem", shardId); + } catch (IOException e) { + logger.warn("Can't delete shard {} ", e, shardId); + } + } + + @Override + public void onShardCloseFailed(ShardId shardId, Throwable t) { + } + }); } } } @@ -839,10 +868,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent locks = env.lockAllForIndex(new Index("foo")); + try { + env.shardLock(new ShardId("foo", randomBoolean() ? 1 : 2)); + fail("shard is locked"); + } catch (LockObtainFailedException ex) { + // expected + } + IOUtils.close(locks); + assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty()); + } + + @Test + public void testGetAllIndices() throws Exception { + Settings settings = nodeEnvSettings(tmpPaths()); + NodeEnvironment env = new NodeEnvironment(settings, new Environment(settings)); + final int numIndices = randomIntBetween(1, 10); + for (int i = 0; i < numIndices; i++) { + for (Path path : env.indexPaths(new Index("foo" + i))) { + Files.createDirectories(path); + } + } + Set indices = env.findAllIndices(); + assertEquals(indices.size(), numIndices); + for (int i = 0; i < numIndices; i++) { + assertTrue(indices.contains("foo" + i)); + } + assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty()); + } + + @Test + public void testDeleteSafe() throws IOException { + Settings settings = nodeEnvSettings(tmpPaths()); + NodeEnvironment env = new NodeEnvironment(settings, new Environment(settings)); + + ShardLock fooLock = env.shardLock(new ShardId("foo", 1)); + assertEquals(new ShardId("foo", 1), fooLock.getShardId()); + + + for (Path path : env.indexPaths(new Index("foo"))) { + Files.createDirectories(path.resolve("1")); + Files.createDirectories(path.resolve("2")); + } + + try { + env.deleteShardDirectorySafe(new ShardId("foo", 1)); + fail("shard is locked"); + } catch (LockObtainFailedException ex) { + // expected + } + + for (Path path : env.indexPaths(new Index("foo"))) { + assertTrue(Files.exists(path.resolve("1"))); + assertTrue(Files.exists(path.resolve("2"))); + + } + + env.deleteShardDirectorySafe(new ShardId("foo", 2)); + + for (Path path : env.indexPaths(new Index("foo"))) { + assertTrue(Files.exists(path.resolve("1"))); + assertFalse(Files.exists(path.resolve("2"))); + } + + try { + env.deleteIndexDirectorySafe(new Index("foo")); + fail("shard is locked"); + } catch (LockObtainFailedException ex) { + // expected + } + fooLock.close(); + + for (Path path : env.indexPaths(new Index("foo"))) { + assertTrue(Files.exists(path)); + } + + env.deleteIndexDirectorySafe(new Index("foo")); + + for (Path path : env.indexPaths(new Index("foo"))) { + assertFalse(Files.exists(path)); + } + assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty()); + } + + @Test + public void testGetAllShards() throws Exception { + Settings settings = nodeEnvSettings(tmpPaths()); + NodeEnvironment env = new NodeEnvironment(settings, new Environment(settings)); + final int numIndices = randomIntBetween(1, 10); + final Set createdShards = new HashSet<>(); + for (int i = 0; i < numIndices; i++) { + for (Path path : env.indexPaths(new Index("foo" + i))) { + final int numShards = randomIntBetween(1, 10); + for (int j = 0; j < numShards; j++) { + Files.createDirectories(path.resolve(Integer.toString(j))); + createdShards.add(new ShardId("foo" + i, j)); + } + } + } + Set shards = env.findAllShardIds(); + assertEquals(shards.size(), createdShards.size()); + assertEquals(shards, createdShards); + + Index index = new Index("foo" + randomIntBetween(1, numIndices)); + shards = env.findAllShardIds(index); + for (ShardId id : createdShards) { + if (index.getName().equals(id.getIndex())) { + assertNotNull("missing shard " + id, shards.remove(id)); + } + } + assertEquals("too many shards found", shards.size(), 0); + assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty()); + } + + @Test + public void testStressShardLock() throws IOException, InterruptedException { + class Int { + int value = 0; + } + Settings settings = nodeEnvSettings(tmpPaths()); + final NodeEnvironment env = new NodeEnvironment(settings, new Environment(settings)); + final int shards = randomIntBetween(2, 10); + final Int[] counts = new Int[shards]; + final AtomicInteger[] countsAtomic = new AtomicInteger[shards]; + final AtomicInteger[] flipFlop = new AtomicInteger[shards]; + + for (int i = 0; i < counts.length; i++) { + counts[i] = new Int(); + countsAtomic[i] = new AtomicInteger(); + flipFlop[i] = new AtomicInteger(); + } + + Thread[] threads = new Thread[randomIntBetween(2,5)]; + final CountDownLatch latch = new CountDownLatch(1); + final int iters = scaledRandomIntBetween(10000, 100000); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread() { + @Override + public void run() { + try { + latch.await(); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + for (int i = 0; i < iters; i++) { + int shard = randomIntBetween(0, counts.length-1); + try { + try (ShardLock _ = env.shardLock(new ShardId("foo", shard), scaledRandomIntBetween(0, 10))) { + counts[shard].value++; + countsAtomic[shard].incrementAndGet(); + assertEquals(flipFlop[shard].incrementAndGet(), 1); + assertEquals(flipFlop[shard].decrementAndGet(), 0); + } + } catch (LockObtainFailedException ex) { + // ok + } catch (IOException ex) { + fail(ex.toString()); + } + } + } + }; + threads[i].start(); + } + latch.countDown(); // fire the threads up + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + + assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty()); + for (int i = 0; i < counts.length; i++) { + assertTrue(counts[i].value > 0); + assertEquals(flipFlop[i].get(), 0); + assertEquals(counts[i].value, countsAtomic[i].get()); + } + + } + + private String[] tmpPaths() { + final int numPaths = randomIntBetween(1, 3); + final String[] absPaths = new String[numPaths]; + for (int i = 0; i < numPaths; i++) { + absPaths[i] = newTempDir().getAbsolutePath(); + } + return absPaths; + } + + private Settings nodeEnvSettings(String[] dataPaths) { + return ImmutableSettings.builder() + .put("path.home", newTempDir().getAbsolutePath()) + .putArray("path.data", dataPaths).build(); + } +} diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index 30e05f98aa8..68e384de1e5 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -30,10 +30,9 @@ import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexDeletionPolicy; -import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; -import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Lock; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -44,6 +43,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.analysis.AnalysisService; @@ -73,6 +73,7 @@ import org.elasticsearch.index.store.ram.RamDirectoryService; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogSizeMatcher; import org.elasticsearch.index.translog.fs.FsTranslog; +import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; @@ -173,12 +174,12 @@ public class InternalEngineTests extends ElasticsearchTestCase { protected Store createStore() throws IOException { DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS); - return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService)); + return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId)); } protected Store createStoreReplica() throws IOException { DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS); - return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService)); + return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId)); } protected Translog createTranslog() { diff --git a/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java b/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java index 7d350a327b2..76d14f42e7c 100644 --- a/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java +++ b/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java @@ -18,9 +18,11 @@ */ package org.elasticsearch.index.merge.policy; +import org.apache.lucene.store.Lock; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.ShardId; @@ -28,6 +30,7 @@ import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.distributor.LeastUsedDistributor; import org.elasticsearch.index.store.ram.RamDirectoryService; +import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.Test; @@ -172,7 +175,7 @@ public class MergePolicySettingsTest extends ElasticsearchTestCase { protected Store createStore(Settings settings) throws IOException { DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS); - return new Store(shardId, settings, null, directoryService, new LeastUsedDistributor(directoryService)); + return new Store(shardId, settings, null, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId)); } } diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java index 31ed433bd25..dc59574eecb 100644 --- a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java +++ b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java @@ -92,7 +92,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest { protected Settings nodeSettings(int nodeOrdinal) { return ImmutableSettings.builder() // we really need local GW here since this also checks for corruption etc. - // and we need to make sure primaries are not just trashed if we don'tmvn have replicas + // and we need to make sure primaries are not just trashed if we don't have replicas .put(super.nodeSettings(nodeOrdinal)).put("gateway.type", "local") .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()).build(); } diff --git a/src/test/java/org/elasticsearch/index/store/DistributorDirectoryTest.java b/src/test/java/org/elasticsearch/index/store/DistributorDirectoryTest.java index 2b20d8bf761..ea25092e375 100644 --- a/src/test/java/org/elasticsearch/index/store/DistributorDirectoryTest.java +++ b/src/test/java/org/elasticsearch/index/store/DistributorDirectoryTest.java @@ -18,10 +18,6 @@ */ package org.elasticsearch.index.store; -import com.carrotsearch.randomizedtesting.annotations.Listeners; -import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; -import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; -import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; import com.carrotsearch.randomizedtesting.annotations.*; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.lucene.index.IndexFileNames; @@ -35,12 +31,10 @@ import org.elasticsearch.index.store.distributor.Distributor; import org.elasticsearch.test.ElasticsearchThreadFilter; import org.elasticsearch.test.junit.listeners.LoggingListener; -import java.io.IOException; -import java.nio.file.Path; -import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.NoSuchFileException; +import java.nio.file.Path; import java.util.Arrays; @ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class}) @@ -62,13 +56,7 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase { ((MockDirectoryWrapper) directories[i]).setEnableVirusScanner(false); } } - return new FilterDirectory(new DistributorDirectory(directories)) { - @Override - public void close() throws IOException { - assertTrue(DistributorDirectory.assertConsistency(logger, ((DistributorDirectory) this.getDelegate()))); - super.close(); - } - }; + return new DistributorDirectory(directories); } // #7306: don't invoke the distributor when we are opening an already existing file @@ -105,7 +93,6 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase { } catch (IllegalStateException ise) { // expected } - assertTrue(DistributorDirectory.assertConsistency(logger, dd)); dd.close(); } @@ -167,7 +154,6 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase { // target file already exists } } - assertTrue(DistributorDirectory.assertConsistency(logger, dd)); IOUtils.close(dd); } } diff --git a/src/test/java/org/elasticsearch/index/store/DistributorInTheWildTest.java b/src/test/java/org/elasticsearch/index/store/DistributorInTheWildTest.java index 394bc4df243..f9bc097fa65 100644 --- a/src/test/java/org/elasticsearch/index/store/DistributorInTheWildTest.java +++ b/src/test/java/org/elasticsearch/index/store/DistributorInTheWildTest.java @@ -27,7 +27,6 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.common.logging.ESLogger; @@ -153,17 +152,11 @@ public class DistributorInTheWildTest extends ThreadedIndexingAndSearchingTestCa } try { - FilterDirectory distributorDirectory = new FilterDirectory(new DistributorDirectory(directories)) { - @Override - public void close() throws IOException { - assertTrue(DistributorDirectory.assertConsistency(logger, (DistributorDirectory) this.getDelegate())); - super.close(); - } - }; + if (random().nextBoolean()) { - return new MockDirectoryWrapper(random(), distributorDirectory); + return new MockDirectoryWrapper(random(), new DistributorDirectory(directories)); } else { - return distributorDirectory; + return new DistributorDirectory(directories); } } catch (IOException ex) { throw new RuntimeException(ex); diff --git a/src/test/java/org/elasticsearch/index/store/StoreTest.java b/src/test/java/org/elasticsearch/index/store/StoreTest.java index 83ed29a8855..1570ae23de0 100644 --- a/src/test/java/org/elasticsearch/index/store/StoreTest.java +++ b/src/test/java/org/elasticsearch/index/store/StoreTest.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.index.store; -import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.document.*; @@ -29,11 +28,13 @@ import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.Version; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.distributor.Distributor; import org.elasticsearch.index.store.distributor.LeastUsedDistributor; import org.elasticsearch.index.store.distributor.RandomWeightedDistributor; +import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ElasticsearchLuceneTestCase; import org.junit.Test; @@ -41,6 +42,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.Adler32; import static com.carrotsearch.randomizedtesting.RandomizedTest.*; @@ -52,7 +54,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { public void testRefCount() throws IOException { final ShardId shardId = new ShardId(new Index("index"), 1); DirectoryService directoryService = new LuceneManagedDirectoryService(random()); - Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService)); + Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId)); int incs = randomIntBetween(1, 100); for (int i = 0; i < incs; i++) { if (randomBoolean()) { @@ -69,7 +71,14 @@ public class StoreTest extends ElasticsearchLuceneTestCase { } store.incRef(); - store.close(); + final AtomicBoolean called = new AtomicBoolean(false); + Store.OnCloseListener listener = new Store.OnCloseListener() { + @Override + public void onClose(ShardId shardId) { + assertTrue(called.compareAndSet(false, true)); + } + }; + store.close(listener); for (int i = 0; i < incs; i++) { if (randomBoolean()) { store.incRef(); @@ -84,7 +93,9 @@ public class StoreTest extends ElasticsearchLuceneTestCase { store.ensureOpen(); } + assertFalse(called.get()); store.decRef(); + assertTrue(called.get()); assertFalse(store.tryIncRef()); try { store.incRef(); @@ -100,6 +111,27 @@ public class StoreTest extends ElasticsearchLuceneTestCase { } } + @Test + public void testListenerCanThrowException() throws IOException { + final ShardId shardId = new ShardId(new Index("index"), 1); + DirectoryService directoryService = new LuceneManagedDirectoryService(random()); + final ShardLock shardLock = new DummyShardLock(shardId); + Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService), shardLock); + final AtomicBoolean called = new AtomicBoolean(false); + Store.OnCloseListener listener = new Store.OnCloseListener() { + @Override + public void onClose(ShardId shardId) { + assertTrue(called.compareAndSet(false, true)); + throw new RuntimeException("foobar"); + } + }; + assertTrue(shardLock.isOpen()); + store.close(listener); + assertTrue(called.get()); + assertFalse(shardLock.isOpen()); + // test will barf if the directory is not closed + } + @Test public void testVerifyingIndexOutput() throws IOException { Directory dir = newDirectory(); @@ -160,7 +192,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { public void testWriteLegacyChecksums() throws IOException { final ShardId shardId = new ShardId(new Index("index"), 1); DirectoryService directoryService = new LuceneManagedDirectoryService(random()); - Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService)); + Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId)); // set default codec - all segments need checksums IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec())); int docs = 1 + random().nextInt(100); @@ -229,7 +261,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { public void testNewChecksums() throws IOException { final ShardId shardId = new ShardId(new Index("index"), 1); DirectoryService directoryService = new LuceneManagedDirectoryService(random()); - Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService)); + Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId)); // set default codec - all segments need checksums IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec())); int docs = 1 + random().nextInt(100); @@ -289,7 +321,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { public void testMixedChecksums() throws IOException { final ShardId shardId = new ShardId(new Index("index"), 1); DirectoryService directoryService = new LuceneManagedDirectoryService(random()); - Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService)); + Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId)); // this time random codec.... IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec())); int docs = 1 + random().nextInt(100); @@ -381,7 +413,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { public void testRenameFile() throws IOException { final ShardId shardId = new ShardId(new Index("index"), 1); DirectoryService directoryService = new LuceneManagedDirectoryService(random(), false); - Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService)); + Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId)); { IndexOutput output = store.directory().createOutput("foo.bar", IOContext.DEFAULT); int iters = scaledRandomIntBetween(10, 100); @@ -600,7 +632,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { } } - private static final class LuceneManagedDirectoryService implements DirectoryService { + private static final class LuceneManagedDirectoryService extends DirectoryService { private final Directory[] dirs; private final Random random; @@ -608,6 +640,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { this(random, true); } public LuceneManagedDirectoryService(Random random, boolean preventDoubleWrite) { + super(new ShardId("fake", 1), ImmutableSettings.EMPTY); this.dirs = new Directory[1 + random.nextInt(5)]; for (int i = 0; i < dirs.length; i++) { dirs[i] = newDirectory(random); @@ -669,7 +702,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { iwc.setMaxThreadStates(1); final ShardId shardId = new ShardId(new Index("index"), 1); DirectoryService directoryService = new LuceneManagedDirectoryService(random); - Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(random, directoryService)); + Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(random, directoryService), new DummyShardLock(shardId)); IndexWriter writer = new IndexWriter(store.directory(), iwc); final boolean lotsOfSegments = rarely(random); for (Document d : docs) { @@ -700,7 +733,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { iwc.setMaxThreadStates(1); final ShardId shardId = new ShardId(new Index("index"), 1); DirectoryService directoryService = new LuceneManagedDirectoryService(random); - store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(random, directoryService)); + store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(random, directoryService), new DummyShardLock(shardId)); IndexWriter writer = new IndexWriter(store.directory(), iwc); final boolean lotsOfSegments = rarely(random); for (Document d : docs) { diff --git a/src/test/java/org/elasticsearch/index/store/distributor/DistributorTests.java b/src/test/java/org/elasticsearch/index/store/distributor/DistributorTests.java index 33ffccfbacb..c1864b7a234 100644 --- a/src/test/java/org/elasticsearch/index/store/distributor/DistributorTests.java +++ b/src/test/java/org/elasticsearch/index/store/distributor/DistributorTests.java @@ -20,6 +20,8 @@ package org.elasticsearch.index.store.distributor; import org.apache.lucene.store.*; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.Test; @@ -29,7 +31,6 @@ import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; /** @@ -136,11 +137,12 @@ public class DistributorTests extends ElasticsearchTestCase { } - public static class FakeDirectoryService implements DirectoryService { + public static class FakeDirectoryService extends DirectoryService { private final Directory[] directories; public FakeDirectoryService(Directory[] directories) { + super(new ShardId("fake", 1), ImmutableSettings.EMPTY); this.directories = directories; } diff --git a/src/test/java/org/elasticsearch/indexing/IndexActionTests.java b/src/test/java/org/elasticsearch/indexing/IndexActionTests.java index b4120c570d5..be3f0c22d10 100644 --- a/src/test/java/org/elasticsearch/indexing/IndexActionTests.java +++ b/src/test/java/org/elasticsearch/indexing/IndexActionTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.index.VersionType; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.Test; import java.util.ArrayList; @@ -35,7 +34,6 @@ import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerArray; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -52,9 +50,8 @@ public class IndexActionTests extends ElasticsearchIntegrationTest { * while the index is being created. */ @Test - @TestLogging("action.search:TRACE,indices.recovery:TRACE,index.shard.service:TRACE") public void testAutoGenerateIdNoDuplicates() throws Exception { - int numberOfIterations = randomIntBetween(10, 50); + int numberOfIterations = scaledRandomIntBetween(10, 50); for (int i = 0; i < numberOfIterations; i++) { Throwable firstError = null; createIndex("test"); @@ -65,6 +62,7 @@ public class IndexActionTests extends ElasticsearchIntegrationTest { builders.add(client().prepareIndex("test", "type").setSource("field", "value")); } indexRandom(true, builders); + ensureYellow("test"); logger.info("verifying indexed content"); int numOfChecks = randomIntBetween(8, 12); for (int j = 0; j < numOfChecks; j++) { diff --git a/src/test/java/org/elasticsearch/test/DummyShardLock.java b/src/test/java/org/elasticsearch/test/DummyShardLock.java new file mode 100644 index 00000000000..078803a8125 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/DummyShardLock.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test; + +import org.elasticsearch.env.ShardLock; +import org.elasticsearch.index.shard.ShardId; + +/* + * A ShardLock that does nothing... for tests only + */ +public class DummyShardLock extends ShardLock { + + public DummyShardLock(ShardId id) { + super(id); + } + + @Override + protected void closeInternal() { + } +} diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 5d927cb5de3..35311b8131d 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -29,6 +29,8 @@ import com.google.common.collect.*; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.util.AbstractRandomizedTest; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; @@ -73,6 +75,7 @@ import org.elasticsearch.index.cache.filter.none.NoneFilterCache; import org.elasticsearch.index.cache.filter.weighted.WeightedFilterCache; import org.elasticsearch.index.engine.IndexEngineModule; import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; @@ -99,7 +102,11 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.file.FileVisitResult; +import java.nio.file.FileVisitor; +import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.attribute.BasicFileAttributes; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -114,6 +121,7 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder; import static org.elasticsearch.test.ElasticsearchTestCase.assertBusy; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; import static org.hamcrest.Matchers.*; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -1698,4 +1706,19 @@ public final class InternalTestCluster extends TestCluster { } } + @Override + public void assertAfterTest() throws IOException { + super.assertAfterTest(); + for (NodeEnvironment env : this.getInstances(NodeEnvironment.class)) { + Set shardIds = env.lockedShards(); + for (ShardId id : shardIds) { + try { + env.shardLock(id, TimeUnit.SECONDS.toMillis(5)).close(); + } catch (IOException ex) { + fail("Shard " + id + " is still locked after 5 sec waiting"); + } + } + } + } + } diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java index 18b0f825d84..fd3531b677d 100644 --- a/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/src/test/java/org/elasticsearch/test/TestCluster.java @@ -80,7 +80,7 @@ public abstract class TestCluster implements Iterable, Closeable { /** * This method checks all the things that need to be checked after each test */ - public void assertAfterTest() { + public void assertAfterTest() throws IOException { assertAllSearchersClosed(); assertAllFilesClosed(); ensureEstimatedStats(); diff --git a/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java b/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java index 32a1a3e6a4c..0343b0f9321 100644 --- a/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java +++ b/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.distributor.Distributor; import org.elasticsearch.index.store.fs.FsDirectoryService; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; @@ -149,6 +150,7 @@ public class MockFSDirectoryService extends FsDirectoryService { } catch (Exception e) { logger.warn("failed to check index", e); } finally { + logger.info("end check index"); store.decRef(); } } @@ -168,4 +170,9 @@ public class MockFSDirectoryService extends FsDirectoryService { public long throttleTimeInNanos() { return delegateService.throttleTimeInNanos(); } + + @Override + public Directory newFromDistributor(Distributor distributor) throws IOException { + return helper.wrap(super.newFromDistributor(distributor)); + } } diff --git a/src/test/java/org/elasticsearch/test/store/MockRamDirectoryService.java b/src/test/java/org/elasticsearch/test/store/MockRamDirectoryService.java index e8c3ac35411..447c535b557 100644 --- a/src/test/java/org/elasticsearch/test/store/MockRamDirectoryService.java +++ b/src/test/java/org/elasticsearch/test/store/MockRamDirectoryService.java @@ -22,7 +22,6 @@ package org.elasticsearch.test.store; import org.apache.lucene.store.Directory; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.test.ElasticsearchIntegrationTest; @@ -30,7 +29,7 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest; import java.io.IOException; import java.util.Random; -public class MockRamDirectoryService extends AbstractIndexShardComponent implements DirectoryService { +public class MockRamDirectoryService extends DirectoryService { private final MockDirectoryHelper helper; private final DirectoryService delegateService;