diff --git a/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 0416a13855f..237ead4fc56 100644 --- a/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -22,11 +22,7 @@ package org.elasticsearch.env; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.Lock; -import org.apache.lucene.store.LockObtainFailedException; -import org.apache.lucene.store.NativeFSLockFactory; +import org.apache.lucene.store.*; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -42,7 +38,8 @@ import java.io.Closeable; import java.io.IOException; import java.nio.file.*; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -166,10 +163,11 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{ * non of the shards will be deleted * * @param index the index to delete + * @param lockTimeoutMS how long to wait for acquiring the indices shard locks * @throws Exception if any of the shards data directories can't be locked or deleted */ - public void deleteIndexDirectorySafe(Index index) throws IOException { - final List locks = lockAllForIndex(index); + public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS) throws IOException { + final List locks = lockAllForIndex(index, lockTimeoutMS); try { final Path[] indexPaths = new Path[nodeIndicesPaths.length]; for (int i = 0; i < indexPaths.length; i++) { @@ -188,16 +186,19 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{ * an {@link LockObtainFailedException} is thrown and all previously acquired locks are released. * * @param index the index to lock shards for + * @param lockTimeoutMS how long to wait for acquiring the indices shard locks * @return the {@link ShardLock} instances for this index. * @throws IOException if an IOException occurs. */ - public List lockAllForIndex(Index index) throws IOException { + public List lockAllForIndex(Index index, long lockTimeoutMS) throws IOException { Set allShardIds = findAllShardIds(index); List allLocks = new ArrayList<>(); boolean success = false; + long startTime = System.currentTimeMillis(); try { for (ShardId shardId : allShardIds) { - allLocks.add(shardLock(shardId)); + long timeoutLeft = Math.max(0, lockTimeoutMS - (System.currentTimeMillis() - startTime)); + allLocks.add(shardLock(shardId, timeoutLeft)); } success = true; } finally { diff --git a/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index c38b9f92ee7..90d3c287253 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -43,7 +43,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; -import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; @@ -72,6 +75,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL private static final String DEPRECATED_SETTING_ROUTING_HASH_FUNCTION = "cluster.routing.operation.hash.type"; private static final String DEPRECATED_SETTING_ROUTING_USE_TYPE = "cluster.routing.operation.use_type"; public static final String GATEWAY_DANGLING_TIMEOUT = "gateway.dangling_timeout"; + public static final String GATEWAY_DELETE_TIMEOUT = "gateway.delete_timeout"; public static final String GATEWAY_AUTO_IMPORT_DANGLED = "gateway.auto_import_dangled"; // legacy - this used to be in a different package private static final String GATEWAY_LOCAL_DANGLING_TIMEOUT = "gateway.local.dangling_timeout"; @@ -127,6 +131,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL private final AutoImportDangledState autoImportDangled; private final TimeValue danglingTimeout; + private final TimeValue deleteTimeout; private final Map danglingIndices = ConcurrentCollections.newConcurrentMap(); private final Object danglingMutex = new Object(); @@ -159,8 +164,12 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL this.autoImportDangled = AutoImportDangledState.fromString(settings.get(GATEWAY_AUTO_IMPORT_DANGLED, settings.get(GATEWAY_LOCAL_AUTO_IMPORT_DANGLED, AutoImportDangledState.YES.toString()))); this.danglingTimeout = settings.getAsTime(GATEWAY_DANGLING_TIMEOUT, settings.getAsTime(GATEWAY_LOCAL_DANGLING_TIMEOUT, TimeValue.timeValueHours(2))); + this.deleteTimeout = settings.getAsTime(GATEWAY_DELETE_TIMEOUT, TimeValue.timeValueSeconds(30)); - logger.debug("using gateway.local.auto_import_dangled [{}], with gateway.dangling_timeout [{}]", this.autoImportDangled, this.danglingTimeout); + logger.debug("using {} [{}], {} [{}], with {} [{}]", + GATEWAY_AUTO_IMPORT_DANGLED, this.autoImportDangled, + GATEWAY_DELETE_TIMEOUT, this.deleteTimeout, + GATEWAY_DANGLING_TIMEOUT, this.danglingTimeout); if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) { nodeEnv.ensureAtomicMoveSupported(); } @@ -258,7 +267,10 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL try { final Index idx = new Index(current.index()); MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(idx)); - nodeEnv.deleteIndexDirectorySafe(idx); + // it may take a couple of seconds for outstanding shard reference + // to release their refs (for example, on going recoveries) + // we are working on a better solution see: https://github.com/elasticsearch/elasticsearch/pull/8608 + nodeEnv.deleteIndexDirectorySafe(idx, deleteTimeout.millis()); } catch (LockObtainFailedException ex) { logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, current.index()); } catch (Exception ex) { @@ -302,14 +314,14 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL try { // the index deletion might not have worked due to shards still being locked // we have three cases here: - // - we acquired all shards locks here --> we can import the dangeling index + // - we acquired all shards locks here --> we can import the dangling index // - we failed to acquire the lock --> somebody else uses it - DON'T IMPORT // - we acquired successfully but the lock list is empty --> no shards present - DON'T IMPORT // in the last case we should in-fact try to delete the directory since it might be a leftover... - final List shardLocks = nodeEnv.lockAllForIndex(index); + final List shardLocks = nodeEnv.lockAllForIndex(index, 0); if (shardLocks.isEmpty()) { // no shards - try to remove the directory - nodeEnv.deleteIndexDirectorySafe(index); + nodeEnv.deleteIndexDirectorySafe(index, 0); continue; } IOUtils.closeWhileHandlingException(shardLocks); @@ -323,7 +335,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL } else if (danglingTimeout.millis() == 0) { logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName); try { - nodeEnv.deleteIndexDirectorySafe(index); + nodeEnv.deleteIndexDirectorySafe(index, 0); } catch (LockObtainFailedException ex) { logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, indexName); } catch (Exception ex) { @@ -558,7 +570,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL try { MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index)); - nodeEnv.deleteIndexDirectorySafe(index); + nodeEnv.deleteIndexDirectorySafe(index, 0); } catch (Exception ex) { logger.debug("failed to delete dangling index", ex); } diff --git a/src/main/java/org/elasticsearch/index/IndexService.java b/src/main/java/org/elasticsearch/index/IndexService.java index c7908dba6b6..269ab6a41fb 100644 --- a/src/main/java/org/elasticsearch/index/IndexService.java +++ b/src/main/java/org/elasticsearch/index/IndexService.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.*; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.aliases.IndexAliasesService; @@ -74,17 +73,14 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogModule; import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.indices.IndicesLifecycle; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.ShardsPluginsModule; import java.io.Closeable; import java.io.IOException; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -241,14 +237,12 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone return aliasesService; } - public synchronized void close(final String reason, final IndicesService.IndexCloseListener listener) { + public synchronized void close(final String reason) { if (closed.compareAndSet(false, true)) { final Set shardIds = shardIds(); - final IndicesService.IndexCloseListener innerListener = listener == null ? null : - new PerShardIndexCloseListener(shardIds, listener); for (final int shardId : shardIds) { try { - removeShard(shardId, reason, innerListener); + removeShard(shardId, reason); } catch (Throwable t) { logger.warn("failed to close shard", t); } @@ -350,12 +344,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } } - public void removeShard(int shardId, String reason) throws ElasticsearchException { - removeShard(shardId, reason, null); - } - - public synchronized void removeShard(int shardId, String reason, @Nullable final IndicesService.IndexCloseListener listener) throws ElasticsearchException { - boolean listenerPassed = false; + public synchronized void removeShard(int shardId, String reason) throws ElasticsearchException { final ShardId sId = new ShardId(index, shardId); try { final Injector shardInjector; @@ -441,17 +430,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone final Store store = shardInjector.getInstance(Store.class); // and close it try { - listenerPassed = true; - if (listener == null) { store.close(); - } else { - store.close(new Store.OnCloseListener() { - @Override - public void onClose(ShardId shardId) { - listener.onShardClosed(shardId); - } - }); - } } catch (Throwable e) { logger.warn("[{}] failed to close store on shard deletion", e, shardId); } @@ -459,51 +438,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone logger.debug("[{}] closed (reason: [{}])", shardId, reason); } catch (Throwable t) { - if (listenerPassed == false && listener != null) { // only notify if the listener wasn't passed to the store - listener.onShardCloseFailed(sId, t); - } throw t; } } - - private static final class PerShardIndexCloseListener implements IndicesService.IndexCloseListener { - final CountDown countDown; - final List failures; - private final Set shardIds; - private final IndicesService.IndexCloseListener listener; - - public PerShardIndexCloseListener(Set shardIds, IndicesService.IndexCloseListener listener) { - this.shardIds = shardIds; - this.listener = listener; - countDown = new CountDown(shardIds.size()); - failures = new CopyOnWriteArrayList<>(); - } - - @Override - public void onAllShardsClosed(Index index, List failures) { - assert false : "nobody should call this"; - } - - @Override - public void onShardClosed(ShardId shardId) { - assert countDown.isCountedDown() == false; - assert shardIds.contains(shardId.getId()) : "Unknown shard id"; - listener.onShardClosed(shardId); - if (countDown.countDown()) { - listener.onAllShardsClosed(shardId.index(), failures); - } - } - - @Override - public void onShardCloseFailed(ShardId shardId, Throwable t) { - assert countDown.isCountedDown() == false; - assert shardIds.contains(shardId.getId()) : "Unknown shard id"; - listener.onShardCloseFailed(shardId, t); - failures.add(t); - if (countDown.countDown()) { - listener.onAllShardsClosed(shardId.index(), failures); - } - } - } } diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 1d1a3e91a96..cda1e82dfa1 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -28,7 +28,6 @@ import org.apache.lucene.store.*; import org.apache.lucene.util.*; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; @@ -45,12 +44,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.RefCounted; import org.elasticsearch.env.ShardLock; -import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.distributor.Distributor; -import org.elasticsearch.indices.recovery.RecoveryFailedException; import java.io.*; import java.nio.file.NoSuchFileException; @@ -69,7 +66,7 @@ import java.util.zip.Checksum; * This class also provides access to metadata information like checksums for committed files. A committed * file is a file that belongs to a segment written by a Lucene commit. Files that have not been committed * ie. created during a merge or a shard refresh / NRT reopen are not considered in the MetadataSnapshot. - * + *

* Note: If you use a store it's reference count should be increased before using it by calling #incRef and a * corresponding #decRef must be called in a try/finally block to release the store again ie.: *

@@ -103,7 +100,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
             Store.this.closeInternal();
         }
     };
-    private volatile OnCloseListener onClose;
 
     @Inject
     public Store(ShardId shardId, @IndexSettings Settings indexSettings, DirectoryService directoryService, Distributor distributor, ShardLock shardLock) throws IOException {
@@ -122,6 +118,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
 
     /**
      * Returns the last committed segments info for this store
+     *
      * @throws IOException if the index is corrupted or the segments file is not present
      */
     public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
@@ -130,6 +127,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
 
     /**
      * Returns the segments info for the given commit or for the latest commit if the given commit is null
+     *
      * @throws IOException if the index is corrupted or the segments file is not present
      */
     private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory directory) throws IOException {
@@ -155,16 +153,17 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
     /**
      * Returns a new MetadataSnapshot for the latest commit in this store or
      * an empty snapshot if no index exists or can not be opened.
+     *
      * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
-     * unexpected exception when opening the index reading the segments file.
+     *                               unexpected exception when opening the index reading the segments file.
      */
     public MetadataSnapshot getMetadataOrEmpty() throws IOException {
         try {
             return getMetadata(null);
         } catch (IndexNotFoundException ex) {
-           // that's fine - happens all the time no need to log
+            // that's fine - happens all the time no need to log
         } catch (FileNotFoundException | NoSuchFileException ex) {
-           logger.info("Failed to open / find files while reading metadata snapshot");
+            logger.info("Failed to open / find files while reading metadata snapshot");
         }
         return MetadataSnapshot.EMPTY;
     }
@@ -172,10 +171,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
     /**
      * Returns a new MetadataSnapshot for the latest commit in this store.
      *
-     * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
-     * unexpected exception when opening the index reading the segments file.
-     * @throws FileNotFoundException if one or more files referenced by a commit are not present.
-     * @throws NoSuchFileException if one or more files referenced by a commit are not present.
+     * @throws CorruptIndexException  if the lucene index is corrupted. This can be caused by a checksum mismatch or an
+     *                                unexpected exception when opening the index reading the segments file.
+     * @throws FileNotFoundException  if one or more files referenced by a commit are not present.
+     * @throws NoSuchFileException    if one or more files referenced by a commit are not present.
      * @throws IndexNotFoundException if no index / valid commit-point can be found in this store
      */
     public MetadataSnapshot getMetadata() throws IOException {
@@ -186,10 +185,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
      * Returns a new MetadataSnapshot for the given commit. If the given commit is null
      * the latest commit point is used.
      *
-     * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
-     * unexpected exception when opening the index reading the segments file.
-     * @throws FileNotFoundException if one or more files referenced by a commit are not present.
-     * @throws NoSuchFileException if one or more files referenced by a commit are not present.
+     * @throws CorruptIndexException  if the lucene index is corrupted. This can be caused by a checksum mismatch or an
+     *                                unexpected exception when opening the index reading the segments file.
+     * @throws FileNotFoundException  if one or more files referenced by a commit are not present.
+     * @throws NoSuchFileException    if one or more files referenced by a commit are not present.
      * @throws IndexNotFoundException if the commit point can't be found in this store
      */
     public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
@@ -290,11 +289,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
      * corresponding {@link #decRef}, in a finally clause; otherwise the store may never be closed.  Note that
      * {@link #close} simply calls decRef(), which means that the Store will not really be closed until {@link
      * #decRef} has been called for all outstanding references.
-     *
+     * 

* Note: Close can safely be called multiple times. + * + * @throws AlreadyClosedException iff the reference counter can not be incremented. * @see #decRef * @see #tryIncRef() - * @throws AlreadyClosedException iff the reference counter can not be incremented. */ @Override public final void incRef() { @@ -308,8 +308,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * corresponding {@link #decRef}, in a finally clause; otherwise the store may never be closed. Note that * {@link #close} simply calls decRef(), which means that the Store will not really be closed until {@link * #decRef} has been called for all outstanding references. - * + *

* Note: Close can safely be called multiple times. + * * @see #decRef() * @see #incRef() */ @@ -321,6 +322,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref /** * Decreases the refCount of this Store instance.If the refCount drops to 0, then this * store is closed. + * * @see #incRef */ @Override @@ -330,17 +332,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref @Override public void close() { - close(null); - } - /** - * Closes this store and installs the given {@link org.elasticsearch.index.store.Store.OnCloseListener} - * to be notified once all references to this store are released and the store is closed. - */ - public void close(@Nullable OnCloseListener onClose) { if (isClosed.compareAndSet(false, true)) { - assert this.onClose == null : "OnClose listener is already set"; - this.onClose = onClose; // only do this once! decRef(); logger.debug("store reference count on close: " + refCounter.refCount()); @@ -348,36 +341,25 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } private void closeInternal() { - final OnCloseListener listener = onClose; - onClose = null; try { directory.innerClose(); // this closes the distributorDirectory as well } catch (IOException e) { logger.debug("failed to close directory", e); } finally { - try { - if (listener != null) { - listener.onClose(shardId); - } - } catch (Exception ex){ - logger.debug("OnCloseListener threw an exception", ex); - } finally { - IOUtils.closeWhileHandlingException(shardLock); - } - - + IOUtils.closeWhileHandlingException(shardLock); } } /** * Reads a MetadataSnapshot from the given index locations or returns an empty snapshot if it can't be read. + * * @throws IOException if the index we try to read is corrupted */ public static MetadataSnapshot readMetadataSnapshot(Path[] indexLocations, ESLogger logger) throws IOException { final Directory[] dirs = new Directory[indexLocations.length]; try { - for (int i=0; i< indexLocations.length; i++) { + for (int i = 0; i < indexLocations.length; i++) { dirs[i] = new SimpleFSDirectory(indexLocations[i]); } DistributorDirectory dir = new DistributorDirectory(dirs); @@ -397,7 +379,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * The returned IndexOutput might validate the files checksum if the file has been written with a newer lucene version * and the metadata holds the necessary information to detect that it was been written by Lucene 4.8 or newer. If it has only * a legacy checksum, returned IndexOutput will not verify the checksum. - * + *

* Note: Checksums are calculated nevertheless since lucene does it by default sicne version 4.8.0. This method only adds the * verification against the checksum in the given metadata and does not add any significant overhead. */ @@ -420,16 +402,16 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } success = true; } finally { - if (success == false) { - IOUtils.closeWhileHandlingException(output); - } + if (success == false) { + IOUtils.closeWhileHandlingException(output); + } } return output; } public static void verify(IndexOutput output) throws IOException { if (output instanceof VerifyingIndexOutput) { - ((VerifyingIndexOutput)output).verify(); + ((VerifyingIndexOutput) output).verify(); } } @@ -445,7 +427,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref public static void verify(IndexInput input) throws IOException { if (input instanceof VerifyingIndexInput) { - ((VerifyingIndexInput)input).verify(); + ((VerifyingIndexInput) input).verify(); } } @@ -505,7 +487,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref List ex = new ArrayList<>(); for (String file : files) { if (file.startsWith(CORRUPTED)) { - try(ChecksumIndexInput input = directory.openChecksumInput(file, IOContext.READONCE)) { + try (ChecksumIndexInput input = directory.openChecksumInput(file, IOContext.READONCE)) { int version = CodecUtil.checkHeader(input, CODEC, VERSION_START, VERSION); String msg = input.readString(); StringBuilder builder = new StringBuilder(shardId.toString()); @@ -531,9 +513,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * legacy checksum file. After the delete it pulls the latest metadata snapshot from the store and compares it * to the given snapshot. If the snapshots are inconsistent an illegal state exception is thrown * - * @param reason the reason for this cleanup operation logged for each deleted file + * @param reason the reason for this cleanup operation logged for each deleted file * @param sourceMetaData the metadata used for cleanup. all files in this metadata should be kept around. - * @throws IOException if an IOException occurs + * @throws IOException if an IOException occurs * @throws ElasticsearchIllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) throws IOException { @@ -578,7 +560,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref // this check ensures that the two files are consistent ie. if we don't have checksums only the rest needs to match we are just // verifying that we are consistent on both ends source and target final boolean hashAndLengthEqual = ( - local.checksum() == null + local.checksum() == null && remote.checksum() == null && local.hash().equals(remote.hash()) && local.length() == remote.length()); @@ -591,7 +573,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } else { logger.debug("Files are missing on the recovery target: {} ", recoveryDiff); throw new ElasticsearchIllegalStateException("Files are missing on the recovery target: [different=" - + recoveryDiff.different + ", missing=" + recoveryDiff.missing +']', null); + + recoveryDiff.different + ", missing=" + recoveryDiff.missing + ']', null); } } } @@ -647,7 +629,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * Only files that are part of the last commit are considered in this datastrucutre. * For backwards compatibility the snapshot might include legacy checksums that * are derived from a dedicated checksum file written by older elasticsearch version pre 1.3 - * + *

* Note: This class will ignore the segments.gen file since it's optional and might * change concurrently for safety reasons. * @@ -716,8 +698,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref // TODO we should check the checksum in lucene if we hit an exception Lucene.checkSegmentInfoIntegrity(directory); } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException cex) { - cex.addSuppressed(ex); - throw cex; + cex.addSuppressed(ex); + throw cex; } catch (Throwable e) { // ignore... } @@ -729,7 +711,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref /** * Reads legacy checksum files found in the directory. - * + *

* Files are expected to start with _checksums- prefix * followed by long file version. Only file with the highest version is read, all other files are ignored. * @@ -762,7 +744,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref /** * Deletes all checksum files with version lower than newVersion. * - * @param directory the directory to clean + * @param directory the directory to clean * @param newVersion the latest checksum file version * @throws IOException */ @@ -783,7 +765,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } } - private static void checksumFromLuceneFile(Directory directory, String file, ImmutableMap.Builder builder, ESLogger logger, Version version, boolean readFileAsHash) throws IOException { + private static void checksumFromLuceneFile(Directory directory, String file, ImmutableMap.Builder builder, ESLogger logger, Version version, boolean readFileAsHash) throws IOException { final String checksum; final BytesRefBuilder fileHash = new BytesRefBuilder(); try (final IndexInput in = directory.openInput(file, IOContext.READONCE)) { @@ -824,7 +806,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * Computes a strong hash value for small files. Note that this method should only be used for files < 1MB */ public static void hashFile(BytesRefBuilder fileHash, InputStream in, long size) throws IOException { - final int len = (int)Math.min(1024 * 1024, size); // for safety we limit this to 1MB + final int len = (int) Math.min(1024 * 1024, size); // for safety we limit this to 1MB fileHash.grow(len); fileHash.setLength(len); final int readBytes = Streams.readFully(in, fileHash.bytes(), 0, len); @@ -853,38 +835,38 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref /** * Returns a diff between the two snapshots that can be used for recovery. The given snapshot is treated as the * recovery target and this snapshot as the source. The returned diff will hold a list of files that are: - *

    - *
  • identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered
  • - *
  • different: they exist in both snapshots but their they are not identical
  • - *
  • missing: files that exist in the source but not in the target
  • - *
+ *
    + *
  • identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered
  • + *
  • different: they exist in both snapshots but their they are not identical
  • + *
  • missing: files that exist in the source but not in the target
  • + *
* This method groups file into per-segment files and per-commit files. A file is treated as * identical if and on if all files in it's group are identical. On a per-segment level files for a segment are treated * as identical iff: *
    - *
  • all files in this segment have the same checksum
  • - *
  • all files in this segment have the same length
  • - *
  • the segments .si files hashes are byte-identical Note: This is a using a perfect hash function, The metadata transfers the .si file content as it's hash
  • + *
  • all files in this segment have the same checksum
  • + *
  • all files in this segment have the same length
  • + *
  • the segments .si files hashes are byte-identical Note: This is a using a perfect hash function, The metadata transfers the .si file content as it's hash
  • *
- * + *

* The .si file contains a lot of diagnostics including a timestamp etc. in the future there might be * unique segment identifiers in there hardening this method further. - * + *

* The per-commit files handles very similar. A commit is composed of the segments_N files as well as generational files like * deletes (_x_y.del) or field-info (_x_y.fnm) files. On a per-commit level files for a commit are treated * as identical iff: *

    - *
  • all files belonging to this commit have the same checksum
  • - *
  • all files belonging to this commit have the same length
  • - *
  • the segments file segments_N files hashes are byte-identical Note: This is a using a perfect hash function, The metadata transfers the segments_N file content as it's hash
  • + *
  • all files belonging to this commit have the same checksum
  • + *
  • all files belonging to this commit have the same length
  • + *
  • the segments file segments_N files hashes are byte-identical Note: This is a using a perfect hash function, The metadata transfers the segments_N file content as it's hash
  • *
- * + *

* NOTE: this diff will not contain the segments.gen file. This file is omitted on recovery. */ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { - final ImmutableList.Builder identical = ImmutableList.builder(); - final ImmutableList.Builder different = ImmutableList.builder(); - final ImmutableList.Builder missing = ImmutableList.builder(); + final ImmutableList.Builder identical = ImmutableList.builder(); + final ImmutableList.Builder different = ImmutableList.builder(); + final ImmutableList.Builder missing = ImmutableList.builder(); final Map> perSegment = new HashMap<>(); final List perCommitStoreFiles = new ArrayList<>(); @@ -896,7 +878,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref final String extension = IndexFileNames.getExtension(meta.name()); assert FIELD_INFOS_FILE_EXTENSION.equals(extension) == false || IndexFileNames.stripExtension(IndexFileNames.stripSegmentName(meta.name())).isEmpty() : "FieldInfos are generational but updateable DV are not supported in elasticsearch"; if (IndexFileNames.SEGMENTS.equals(segmentId) || DEL_FILE_EXTENSION.equals(extension) || LIV_FILE_EXTENSION.equals(extension)) { - // only treat del files as per-commit files fnm files are generational but only for upgradable DV + // only treat del files as per-commit files fnm files are generational but only for upgradable DV perCommitStoreFiles.add(meta); } else { List perSegStoreFiles = perSegment.get(segmentId); @@ -931,8 +913,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } } RecoveryDiff recoveryDiff = new RecoveryDiff(identical.build(), different.build(), missing.build()); - assert recoveryDiff.size() == this.metadata.size() - (metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) ? 1: 0) - : "some files are missing recoveryDiff size: [" + recoveryDiff.size() + "] metadata size: [" + this.metadata.size() + "] contains segments.gen: [" + metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) + "]" ; + assert recoveryDiff.size() == this.metadata.size() - (metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) ? 1 : 0) + : "some files are missing recoveryDiff size: [" + recoveryDiff.size() + "] metadata size: [" + this.metadata.size() + "] contains segments.gen: [" + metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) + "]"; return recoveryDiff; } @@ -978,11 +960,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref /** * A class representing the diff between a recovery source and recovery target + * * @see MetadataSnapshot#recoveryDiff(org.elasticsearch.index.store.Store.MetadataSnapshot) */ public static final class RecoveryDiff { /** - * Files that exist in both snapshots and they can be considered the same ie. they don't need to be recovered + * Files that exist in both snapshots and they can be considered the same ie. they don't need to be recovered */ public final List identical; /** @@ -1122,7 +1105,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref public void writeBytes(byte[] b, int offset, int length) throws IOException { if (writtenBytes + length > checksumPosition && actualChecksum == null) { assert writtenBytes <= checksumPosition; - final int bytesToWrite = (int)(checksumPosition-writtenBytes); + final int bytesToWrite = (int) (checksumPosition - writtenBytes); out.writeBytes(b, offset, bytesToWrite); readAndCompareChecksum(); offset += bytesToWrite; @@ -1137,7 +1120,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref /** * Index input that calculates checksum as data is read from the input. - * + *

* This class supports random access (it is possible to seek backward and forward) in order to accommodate retry * mechanism that is used in some repository plugins (S3 for example). However, the checksum is only calculated on * the first read. All consecutive reads of the same data are not used to calculate the checksum. @@ -1183,7 +1166,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref input.readBytes(b, offset, len); if (pos + len > verifiedPosition) { // Conversion to int is safe here because (verifiedPosition - pos) can be at most len, which is integer - int alreadyVerified = (int)Math.max(0, verifiedPosition - pos); + int alreadyVerified = (int) Math.max(0, verifiedPosition - pos); if (pos < checksumPosition) { if (pos + len < checksumPosition) { digest.update(b, offset + alreadyVerified, len - alreadyVerified); @@ -1285,7 +1268,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref ensureOpen(); if (!isMarkedCorrupted()) { String uuid = CORRUPTED + Strings.randomBase64UUID(); - try(IndexOutput output = this.directory().createOutput(uuid, IOContext.DEFAULT)) { + try (IndexOutput output = this.directory().createOutput(uuid, IOContext.DEFAULT)) { CodecUtil.writeHeader(output, CODEC, VERSION); output.writeString(ExceptionsHelper.detailedMessage(exception, true, 0)); // handles null exception output.writeString(ExceptionsHelper.stackTrace(exception)); @@ -1296,17 +1279,4 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref directory().sync(Collections.singleton(uuid)); } } - - /** - * A listener that is called once this store is closed and all references are released - */ - public static interface OnCloseListener { - - /** - * Called once the store is closed and all references are released. - * - * @param shardId the shard ID the calling store belongs to. - */ - public void onClose(ShardId shardId); - } } diff --git a/src/main/java/org/elasticsearch/indices/IndicesService.java b/src/main/java/org/elasticsearch/indices/IndicesService.java index 1b379aba725..792d5495d06 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.inject.*; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.gateway.Gateway; import org.elasticsearch.index.*; import org.elasticsearch.index.aliases.IndexAliasesServiceModule; import org.elasticsearch.index.analysis.AnalysisModule; @@ -133,23 +132,11 @@ public class IndicesService extends AbstractLifecycleComponent i @Override public void run() { try { - removeIndex(index, "shutdown", false, new IndexCloseListener() { - @Override - public void onAllShardsClosed(Index index, List failures) { - latch.countDown(); - } - - @Override - public void onShardClosed(ShardId shardId) { - } - - @Override - public void onShardCloseFailed(ShardId shardId, Throwable t) { - } - }); + removeIndex(index, "shutdown", false); } catch (Throwable e) { - latch.countDown(); logger.warn("failed to delete index on stop [" + index + "]", e); + } finally { + latch.countDown(); } } }); @@ -344,7 +331,7 @@ public class IndicesService extends AbstractLifecycleComponent i * @param reason the high level reason causing this removal */ public void removeIndex(String index, String reason) throws ElasticsearchException { - removeIndex(index, reason, false, null); + removeIndex(index, reason, false); } /** @@ -357,40 +344,10 @@ public class IndicesService extends AbstractLifecycleComponent i * @param reason the high level reason causing this delete */ public void deleteIndex(String index, String reason) throws ElasticsearchException { - removeIndex(index, reason, true, new IndexCloseListener() { - - @Override - public void onAllShardsClosed(Index index, List failures) { - try { - nodeEnv.deleteIndexDirectorySafe(index); - logger.debug("deleted index [{}] from filesystem - failures {}", index, failures); - } catch (Exception e) { - for (Throwable t : failures) { - e.addSuppressed(t); - } - logger.debug("failed to deleted index [{}] from filesystem", e, index); - // ignore - still some shards locked here - } - } - - @Override - public void onShardClosed(ShardId shardId) { - try { - // this is called under the shard lock - we can safely delete it - IOUtils.rm(nodeEnv.shardPaths(shardId)); - logger.debug("deleted shard [{}] from filesystem", shardId); - } catch (IOException e) { - logger.warn("Can't delete shard {} ", e, shardId); - } - } - - @Override - public void onShardCloseFailed(ShardId shardId, Throwable t) { - } - }); + removeIndex(index, reason, true); } - private void removeIndex(String index, String reason, boolean delete, @Nullable IndexCloseListener listener) throws ElasticsearchException { + private void removeIndex(String index, String reason, boolean delete) throws ElasticsearchException { try { final IndexService indexService; final Injector indexInjector; @@ -418,7 +375,7 @@ public class IndicesService extends AbstractLifecycleComponent i })); logger.debug("[{}] closing index service (reason [{}])", index, reason); - ((IndexService) indexService).close(reason, listener); + indexService.close(reason); logger.debug("[{}] closing index cache (reason [{}])", index, reason); indexInjector.getInstance(IndexCache.class).close(); @@ -468,33 +425,4 @@ public class IndicesService extends AbstractLifecycleComponent i } } } - - /** - * A listener interface that can be used to get notification once a shard or all shards - * of an certain index that are allocated on a node are actually closed. The listener methods - * are invoked once the actual low level instance modifying or reading a shard are closed in contrast to - * removal methods that might return earlier. - */ - public static interface IndexCloseListener { - - /** - * Invoked once all shards are closed or their closing failed. - * @param index the index that got closed - * @param failures the recorded shard closing failures - */ - public void onAllShardsClosed(Index index, List failures); - - /** - * Invoked once the last resource using the given shard ID is released. - * Yet, this method is called while still holding the shards lock such that - * operations on the shards data can safely be executed in this callback. - */ - public void onShardClosed(ShardId shardId); - - /** - * Invoked if closing the given shard failed. - */ - public void onShardCloseFailed(ShardId shardId, Throwable t); - - } } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java b/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java index 5187326d4b5..c2417960f70 100644 --- a/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java +++ b/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java @@ -23,6 +23,7 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ElasticsearchTestCase; @@ -37,6 +38,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; public class NodeEnvironmentTests extends ElasticsearchTestCase { @@ -99,7 +101,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase { } try { - env.lockAllForIndex(new Index("foo")); + env.lockAllForIndex(new Index("foo"), randomIntBetween(0, 10)); fail("shard 1 is locked"); } catch (LockObtainFailedException ex) { // expected @@ -109,7 +111,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase { // can lock again? env.shardLock(new ShardId("foo", 1)).close(); - List locks = env.lockAllForIndex(new Index("foo")); + List locks = env.lockAllForIndex(new Index("foo"), randomIntBetween(0, 10)); try { env.shardLock(new ShardId("foo", randomBoolean() ? 1 : 2)); fail("shard is locked"); @@ -172,7 +174,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase { } try { - env.deleteIndexDirectorySafe(new Index("foo")); + env.deleteIndexDirectorySafe(new Index("foo"), randomIntBetween(0, 10)); fail("shard is locked"); } catch (LockObtainFailedException ex) { // expected @@ -183,7 +185,27 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase { assertTrue(Files.exists(path)); } - env.deleteIndexDirectorySafe(new Index("foo")); + final AtomicReference threadException = new AtomicReference<>(); + if (randomBoolean()) { + Thread t = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + threadException.set(t); + } + + @Override + protected void doRun() throws Exception { + try (ShardLock fooLock = env.shardLock(new ShardId("foo", 1))) { + Thread.sleep(100); + } + } + }); + t.start(); + } + + env.deleteIndexDirectorySafe(new Index("foo"), 5000); + + assertNull(threadException.get()); for (Path path : env.indexPaths(new Index("foo"))) { assertFalse(Files.exists(path)); diff --git a/src/test/java/org/elasticsearch/index/store/StoreTest.java b/src/test/java/org/elasticsearch/index/store/StoreTest.java index b63c2eb2d79..7d9f570fe19 100644 --- a/src/test/java/org/elasticsearch/index/store/StoreTest.java +++ b/src/test/java/org/elasticsearch/index/store/StoreTest.java @@ -19,15 +19,19 @@ package org.elasticsearch.index.store; import org.apache.lucene.analysis.MockAnalyzer; -import org.apache.lucene.codecs.*; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.FilterCodec; +import org.apache.lucene.codecs.SegmentInfoFormat; import org.apache.lucene.codecs.lucene50.Lucene50Codec; import org.apache.lucene.codecs.lucene50.Lucene50SegmentInfoFormat; import org.apache.lucene.document.*; import org.apache.lucene.index.*; import org.apache.lucene.store.*; -import org.apache.lucene.util.*; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.TestUtil; +import org.apache.lucene.util.Version; import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.distributor.Distributor; @@ -35,6 +39,7 @@ import org.elasticsearch.index.store.distributor.LeastUsedDistributor; import org.elasticsearch.index.store.distributor.RandomWeightedDistributor; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ElasticsearchLuceneTestCase; +import org.hamcrest.Matchers; import org.junit.Test; import java.io.FileNotFoundException; @@ -71,13 +76,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { store.incRef(); final AtomicBoolean called = new AtomicBoolean(false); - Store.OnCloseListener listener = new Store.OnCloseListener() { - @Override - public void onClose(ShardId shardId) { - assertTrue(called.compareAndSet(false, true)); - } - }; - store.close(listener); + store.close(); for (int i = 0; i < incs; i++) { if (randomBoolean()) { store.incRef(); @@ -92,9 +91,8 @@ public class StoreTest extends ElasticsearchLuceneTestCase { store.ensureOpen(); } - assertFalse(called.get()); store.decRef(); - assertTrue(called.get()); + assertThat(store.refCount(), Matchers.equalTo(0)); assertFalse(store.tryIncRef()); try { store.incRef(); @@ -110,27 +108,6 @@ public class StoreTest extends ElasticsearchLuceneTestCase { } } - @Test - public void testListenerCanThrowException() throws IOException { - final ShardId shardId = new ShardId(new Index("index"), 1); - DirectoryService directoryService = new LuceneManagedDirectoryService(random()); - final ShardLock shardLock = new DummyShardLock(shardId); - Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(directoryService), shardLock); - final AtomicBoolean called = new AtomicBoolean(false); - Store.OnCloseListener listener = new Store.OnCloseListener() { - @Override - public void onClose(ShardId shardId) { - assertTrue(called.compareAndSet(false, true)); - throw new RuntimeException("foobar"); - } - }; - assertTrue(shardLock.isOpen()); - store.close(listener); - assertTrue(called.get()); - assertFalse(shardLock.isOpen()); - // test will barf if the directory is not closed - } - @Test public void testVerifyingIndexOutput() throws IOException { Directory dir = newDirectory();