Internal: remove IndexCloseListener & Store.OnCloseListener

Closes #9009
This commit is contained in:
Boaz Leskes 2014-12-18 16:34:05 +01:00
parent c077683248
commit 4d699bd76c
7 changed files with 145 additions and 299 deletions

View File

@ -22,11 +22,7 @@ package org.elasticsearch.env;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.*;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -42,7 +38,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -166,10 +163,11 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
* non of the shards will be deleted
*
* @param index the index to delete
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
* @throws Exception if any of the shards data directories can't be locked or deleted
*/
public void deleteIndexDirectorySafe(Index index) throws IOException {
final List<ShardLock> locks = lockAllForIndex(index);
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS) throws IOException {
final List<ShardLock> locks = lockAllForIndex(index, lockTimeoutMS);
try {
final Path[] indexPaths = new Path[nodeIndicesPaths.length];
for (int i = 0; i < indexPaths.length; i++) {
@ -188,16 +186,19 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
* an {@link LockObtainFailedException} is thrown and all previously acquired locks are released.
*
* @param index the index to lock shards for
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
* @return the {@link ShardLock} instances for this index.
* @throws IOException if an IOException occurs.
*/
public List<ShardLock> lockAllForIndex(Index index) throws IOException {
public List<ShardLock> lockAllForIndex(Index index, long lockTimeoutMS) throws IOException {
Set<ShardId> allShardIds = findAllShardIds(index);
List<ShardLock> allLocks = new ArrayList<>();
boolean success = false;
long startTime = System.currentTimeMillis();
try {
for (ShardId shardId : allShardIds) {
allLocks.add(shardLock(shardId));
long timeoutLeft = Math.max(0, lockTimeoutMS - (System.currentTimeMillis() - startTime));
allLocks.add(shardLock(shardId, timeoutLeft));
}
success = true;
} finally {

View File

@ -43,7 +43,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
@ -72,6 +75,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
private static final String DEPRECATED_SETTING_ROUTING_HASH_FUNCTION = "cluster.routing.operation.hash.type";
private static final String DEPRECATED_SETTING_ROUTING_USE_TYPE = "cluster.routing.operation.use_type";
public static final String GATEWAY_DANGLING_TIMEOUT = "gateway.dangling_timeout";
public static final String GATEWAY_DELETE_TIMEOUT = "gateway.delete_timeout";
public static final String GATEWAY_AUTO_IMPORT_DANGLED = "gateway.auto_import_dangled";
// legacy - this used to be in a different package
private static final String GATEWAY_LOCAL_DANGLING_TIMEOUT = "gateway.local.dangling_timeout";
@ -127,6 +131,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
private final AutoImportDangledState autoImportDangled;
private final TimeValue danglingTimeout;
private final TimeValue deleteTimeout;
private final Map<String, DanglingIndex> danglingIndices = ConcurrentCollections.newConcurrentMap();
private final Object danglingMutex = new Object();
@ -159,8 +164,12 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
this.autoImportDangled = AutoImportDangledState.fromString(settings.get(GATEWAY_AUTO_IMPORT_DANGLED, settings.get(GATEWAY_LOCAL_AUTO_IMPORT_DANGLED, AutoImportDangledState.YES.toString())));
this.danglingTimeout = settings.getAsTime(GATEWAY_DANGLING_TIMEOUT, settings.getAsTime(GATEWAY_LOCAL_DANGLING_TIMEOUT, TimeValue.timeValueHours(2)));
this.deleteTimeout = settings.getAsTime(GATEWAY_DELETE_TIMEOUT, TimeValue.timeValueSeconds(30));
logger.debug("using gateway.local.auto_import_dangled [{}], with gateway.dangling_timeout [{}]", this.autoImportDangled, this.danglingTimeout);
logger.debug("using {} [{}], {} [{}], with {} [{}]",
GATEWAY_AUTO_IMPORT_DANGLED, this.autoImportDangled,
GATEWAY_DELETE_TIMEOUT, this.deleteTimeout,
GATEWAY_DANGLING_TIMEOUT, this.danglingTimeout);
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
nodeEnv.ensureAtomicMoveSupported();
}
@ -258,7 +267,10 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
try {
final Index idx = new Index(current.index());
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(idx));
nodeEnv.deleteIndexDirectorySafe(idx);
// it may take a couple of seconds for outstanding shard reference
// to release their refs (for example, on going recoveries)
// we are working on a better solution see: https://github.com/elasticsearch/elasticsearch/pull/8608
nodeEnv.deleteIndexDirectorySafe(idx, deleteTimeout.millis());
} catch (LockObtainFailedException ex) {
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, current.index());
} catch (Exception ex) {
@ -302,14 +314,14 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
try {
// the index deletion might not have worked due to shards still being locked
// we have three cases here:
// - we acquired all shards locks here --> we can import the dangeling index
// - we acquired all shards locks here --> we can import the dangling index
// - we failed to acquire the lock --> somebody else uses it - DON'T IMPORT
// - we acquired successfully but the lock list is empty --> no shards present - DON'T IMPORT
// in the last case we should in-fact try to delete the directory since it might be a leftover...
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index);
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, 0);
if (shardLocks.isEmpty()) {
// no shards - try to remove the directory
nodeEnv.deleteIndexDirectorySafe(index);
nodeEnv.deleteIndexDirectorySafe(index, 0);
continue;
}
IOUtils.closeWhileHandlingException(shardLocks);
@ -323,7 +335,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
} else if (danglingTimeout.millis() == 0) {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
try {
nodeEnv.deleteIndexDirectorySafe(index);
nodeEnv.deleteIndexDirectorySafe(index, 0);
} catch (LockObtainFailedException ex) {
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, indexName);
} catch (Exception ex) {
@ -558,7 +570,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
try {
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index));
nodeEnv.deleteIndexDirectorySafe(index);
nodeEnv.deleteIndexDirectorySafe(index, 0);
} catch (Exception ex) {
logger.debug("failed to delete dangling index", ex);
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.aliases.IndexAliasesService;
@ -74,17 +73,14 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogModule;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ShardsPluginsModule;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -241,14 +237,12 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
return aliasesService;
}
public synchronized void close(final String reason, final IndicesService.IndexCloseListener listener) {
public synchronized void close(final String reason) {
if (closed.compareAndSet(false, true)) {
final Set<Integer> shardIds = shardIds();
final IndicesService.IndexCloseListener innerListener = listener == null ? null :
new PerShardIndexCloseListener(shardIds, listener);
for (final int shardId : shardIds) {
try {
removeShard(shardId, reason, innerListener);
removeShard(shardId, reason);
} catch (Throwable t) {
logger.warn("failed to close shard", t);
}
@ -350,12 +344,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
}
public void removeShard(int shardId, String reason) throws ElasticsearchException {
removeShard(shardId, reason, null);
}
public synchronized void removeShard(int shardId, String reason, @Nullable final IndicesService.IndexCloseListener listener) throws ElasticsearchException {
boolean listenerPassed = false;
public synchronized void removeShard(int shardId, String reason) throws ElasticsearchException {
final ShardId sId = new ShardId(index, shardId);
try {
final Injector shardInjector;
@ -441,17 +430,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
final Store store = shardInjector.getInstance(Store.class);
// and close it
try {
listenerPassed = true;
if (listener == null) {
store.close();
} else {
store.close(new Store.OnCloseListener() {
@Override
public void onClose(ShardId shardId) {
listener.onShardClosed(shardId);
}
});
}
} catch (Throwable e) {
logger.warn("[{}] failed to close store on shard deletion", e, shardId);
}
@ -459,51 +438,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
logger.debug("[{}] closed (reason: [{}])", shardId, reason);
} catch (Throwable t) {
if (listenerPassed == false && listener != null) { // only notify if the listener wasn't passed to the store
listener.onShardCloseFailed(sId, t);
}
throw t;
}
}
private static final class PerShardIndexCloseListener implements IndicesService.IndexCloseListener {
final CountDown countDown;
final List<Throwable> failures;
private final Set<Integer> shardIds;
private final IndicesService.IndexCloseListener listener;
public PerShardIndexCloseListener(Set<Integer> shardIds, IndicesService.IndexCloseListener listener) {
this.shardIds = shardIds;
this.listener = listener;
countDown = new CountDown(shardIds.size());
failures = new CopyOnWriteArrayList<>();
}
@Override
public void onAllShardsClosed(Index index, List<Throwable> failures) {
assert false : "nobody should call this";
}
@Override
public void onShardClosed(ShardId shardId) {
assert countDown.isCountedDown() == false;
assert shardIds.contains(shardId.getId()) : "Unknown shard id";
listener.onShardClosed(shardId);
if (countDown.countDown()) {
listener.onAllShardsClosed(shardId.index(), failures);
}
}
@Override
public void onShardCloseFailed(ShardId shardId, Throwable t) {
assert countDown.isCountedDown() == false;
assert shardIds.contains(shardId.getId()) : "Unknown shard id";
listener.onShardCloseFailed(shardId, t);
failures.add(t);
if (countDown.countDown()) {
listener.onAllShardsClosed(shardId.index(), failures);
}
}
}
}

View File

@ -28,7 +28,6 @@ import org.apache.lucene.store.*;
import org.apache.lucene.util.*;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
@ -45,12 +44,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.distributor.Distributor;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import java.io.*;
import java.nio.file.NoSuchFileException;
@ -69,7 +66,7 @@ import java.util.zip.Checksum;
* This class also provides access to metadata information like checksums for committed files. A committed
* file is a file that belongs to a segment written by a Lucene commit. Files that have not been committed
* ie. created during a merge or a shard refresh / NRT reopen are not considered in the MetadataSnapshot.
*
* <p/>
* Note: If you use a store it's reference count should be increased before using it by calling #incRef and a
* corresponding #decRef must be called in a try/finally block to release the store again ie.:
* <pre>
@ -103,7 +100,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
Store.this.closeInternal();
}
};
private volatile OnCloseListener onClose;
@Inject
public Store(ShardId shardId, @IndexSettings Settings indexSettings, DirectoryService directoryService, Distributor distributor, ShardLock shardLock) throws IOException {
@ -122,6 +118,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
/**
* Returns the last committed segments info for this store
*
* @throws IOException if the index is corrupted or the segments file is not present
*/
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
@ -130,6 +127,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
/**
* Returns the segments info for the given commit or for the latest commit if the given commit is <code>null</code>
*
* @throws IOException if the index is corrupted or the segments file is not present
*/
private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory directory) throws IOException {
@ -155,16 +153,17 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
/**
* Returns a new MetadataSnapshot for the latest commit in this store or
* an empty snapshot if no index exists or can not be opened.
*
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
* unexpected exception when opening the index reading the segments file.
* unexpected exception when opening the index reading the segments file.
*/
public MetadataSnapshot getMetadataOrEmpty() throws IOException {
try {
return getMetadata(null);
} catch (IndexNotFoundException ex) {
// that's fine - happens all the time no need to log
// that's fine - happens all the time no need to log
} catch (FileNotFoundException | NoSuchFileException ex) {
logger.info("Failed to open / find files while reading metadata snapshot");
logger.info("Failed to open / find files while reading metadata snapshot");
}
return MetadataSnapshot.EMPTY;
}
@ -172,10 +171,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
/**
* Returns a new MetadataSnapshot for the latest commit in this store.
*
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
* unexpected exception when opening the index reading the segments file.
* @throws FileNotFoundException if one or more files referenced by a commit are not present.
* @throws NoSuchFileException if one or more files referenced by a commit are not present.
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
* unexpected exception when opening the index reading the segments file.
* @throws FileNotFoundException if one or more files referenced by a commit are not present.
* @throws NoSuchFileException if one or more files referenced by a commit are not present.
* @throws IndexNotFoundException if no index / valid commit-point can be found in this store
*/
public MetadataSnapshot getMetadata() throws IOException {
@ -186,10 +185,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* Returns a new MetadataSnapshot for the given commit. If the given commit is <code>null</code>
* the latest commit point is used.
*
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
* unexpected exception when opening the index reading the segments file.
* @throws FileNotFoundException if one or more files referenced by a commit are not present.
* @throws NoSuchFileException if one or more files referenced by a commit are not present.
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
* unexpected exception when opening the index reading the segments file.
* @throws FileNotFoundException if one or more files referenced by a commit are not present.
* @throws NoSuchFileException if one or more files referenced by a commit are not present.
* @throws IndexNotFoundException if the commit point can't be found in this store
*/
public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
@ -290,11 +289,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* corresponding {@link #decRef}, in a finally clause; otherwise the store may never be closed. Note that
* {@link #close} simply calls decRef(), which means that the Store will not really be closed until {@link
* #decRef} has been called for all outstanding references.
*
* <p/>
* Note: Close can safely be called multiple times.
*
* @throws AlreadyClosedException iff the reference counter can not be incremented.
* @see #decRef
* @see #tryIncRef()
* @throws AlreadyClosedException iff the reference counter can not be incremented.
*/
@Override
public final void incRef() {
@ -308,8 +308,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* corresponding {@link #decRef}, in a finally clause; otherwise the store may never be closed. Note that
* {@link #close} simply calls decRef(), which means that the Store will not really be closed until {@link
* #decRef} has been called for all outstanding references.
*
* <p/>
* Note: Close can safely be called multiple times.
*
* @see #decRef()
* @see #incRef()
*/
@ -321,6 +322,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
/**
* Decreases the refCount of this Store instance.If the refCount drops to 0, then this
* store is closed.
*
* @see #incRef
*/
@Override
@ -330,17 +332,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
@Override
public void close() {
close(null);
}
/**
* Closes this store and installs the given {@link org.elasticsearch.index.store.Store.OnCloseListener}
* to be notified once all references to this store are released and the store is closed.
*/
public void close(@Nullable OnCloseListener onClose) {
if (isClosed.compareAndSet(false, true)) {
assert this.onClose == null : "OnClose listener is already set";
this.onClose = onClose;
// only do this once!
decRef();
logger.debug("store reference count on close: " + refCounter.refCount());
@ -348,36 +341,25 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
private void closeInternal() {
final OnCloseListener listener = onClose;
onClose = null;
try {
directory.innerClose(); // this closes the distributorDirectory as well
} catch (IOException e) {
logger.debug("failed to close directory", e);
} finally {
try {
if (listener != null) {
listener.onClose(shardId);
}
} catch (Exception ex){
logger.debug("OnCloseListener threw an exception", ex);
} finally {
IOUtils.closeWhileHandlingException(shardLock);
}
IOUtils.closeWhileHandlingException(shardLock);
}
}
/**
* Reads a MetadataSnapshot from the given index locations or returns an empty snapshot if it can't be read.
*
* @throws IOException if the index we try to read is corrupted
*/
public static MetadataSnapshot readMetadataSnapshot(Path[] indexLocations, ESLogger logger) throws IOException {
final Directory[] dirs = new Directory[indexLocations.length];
try {
for (int i=0; i< indexLocations.length; i++) {
for (int i = 0; i < indexLocations.length; i++) {
dirs[i] = new SimpleFSDirectory(indexLocations[i]);
}
DistributorDirectory dir = new DistributorDirectory(dirs);
@ -397,7 +379,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* The returned IndexOutput might validate the files checksum if the file has been written with a newer lucene version
* and the metadata holds the necessary information to detect that it was been written by Lucene 4.8 or newer. If it has only
* a legacy checksum, returned IndexOutput will not verify the checksum.
*
* <p/>
* Note: Checksums are calculated nevertheless since lucene does it by default sicne version 4.8.0. This method only adds the
* verification against the checksum in the given metadata and does not add any significant overhead.
*/
@ -420,16 +402,16 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(output);
}
if (success == false) {
IOUtils.closeWhileHandlingException(output);
}
}
return output;
}
public static void verify(IndexOutput output) throws IOException {
if (output instanceof VerifyingIndexOutput) {
((VerifyingIndexOutput)output).verify();
((VerifyingIndexOutput) output).verify();
}
}
@ -445,7 +427,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public static void verify(IndexInput input) throws IOException {
if (input instanceof VerifyingIndexInput) {
((VerifyingIndexInput)input).verify();
((VerifyingIndexInput) input).verify();
}
}
@ -505,7 +487,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
List<CorruptIndexException> ex = new ArrayList<>();
for (String file : files) {
if (file.startsWith(CORRUPTED)) {
try(ChecksumIndexInput input = directory.openChecksumInput(file, IOContext.READONCE)) {
try (ChecksumIndexInput input = directory.openChecksumInput(file, IOContext.READONCE)) {
int version = CodecUtil.checkHeader(input, CODEC, VERSION_START, VERSION);
String msg = input.readString();
StringBuilder builder = new StringBuilder(shardId.toString());
@ -531,9 +513,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* legacy checksum file. After the delete it pulls the latest metadata snapshot from the store and compares it
* to the given snapshot. If the snapshots are inconsistent an illegal state exception is thrown
*
* @param reason the reason for this cleanup operation logged for each deleted file
* @param reason the reason for this cleanup operation logged for each deleted file
* @param sourceMetaData the metadata used for cleanup. all files in this metadata should be kept around.
* @throws IOException if an IOException occurs
* @throws IOException if an IOException occurs
* @throws ElasticsearchIllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) throws IOException {
@ -578,7 +560,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
// this check ensures that the two files are consistent ie. if we don't have checksums only the rest needs to match we are just
// verifying that we are consistent on both ends source and target
final boolean hashAndLengthEqual = (
local.checksum() == null
local.checksum() == null
&& remote.checksum() == null
&& local.hash().equals(remote.hash())
&& local.length() == remote.length());
@ -591,7 +573,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
} else {
logger.debug("Files are missing on the recovery target: {} ", recoveryDiff);
throw new ElasticsearchIllegalStateException("Files are missing on the recovery target: [different="
+ recoveryDiff.different + ", missing=" + recoveryDiff.missing +']', null);
+ recoveryDiff.different + ", missing=" + recoveryDiff.missing + ']', null);
}
}
}
@ -647,7 +629,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* Only files that are part of the last commit are considered in this datastrucutre.
* For backwards compatibility the snapshot might include legacy checksums that
* are derived from a dedicated checksum file written by older elasticsearch version pre 1.3
*
* <p/>
* Note: This class will ignore the <tt>segments.gen</tt> file since it's optional and might
* change concurrently for safety reasons.
*
@ -716,8 +698,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
// TODO we should check the checksum in lucene if we hit an exception
Lucene.checkSegmentInfoIntegrity(directory);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException cex) {
cex.addSuppressed(ex);
throw cex;
cex.addSuppressed(ex);
throw cex;
} catch (Throwable e) {
// ignore...
}
@ -729,7 +711,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
/**
* Reads legacy checksum files found in the directory.
*
* <p/>
* Files are expected to start with _checksums- prefix
* followed by long file version. Only file with the highest version is read, all other files are ignored.
*
@ -762,7 +744,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
/**
* Deletes all checksum files with version lower than newVersion.
*
* @param directory the directory to clean
* @param directory the directory to clean
* @param newVersion the latest checksum file version
* @throws IOException
*/
@ -783,7 +765,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
}
private static void checksumFromLuceneFile(Directory directory, String file, ImmutableMap.Builder<String, StoreFileMetaData> builder, ESLogger logger, Version version, boolean readFileAsHash) throws IOException {
private static void checksumFromLuceneFile(Directory directory, String file, ImmutableMap.Builder<String, StoreFileMetaData> builder, ESLogger logger, Version version, boolean readFileAsHash) throws IOException {
final String checksum;
final BytesRefBuilder fileHash = new BytesRefBuilder();
try (final IndexInput in = directory.openInput(file, IOContext.READONCE)) {
@ -824,7 +806,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* Computes a strong hash value for small files. Note that this method should only be used for files < 1MB
*/
public static void hashFile(BytesRefBuilder fileHash, InputStream in, long size) throws IOException {
final int len = (int)Math.min(1024 * 1024, size); // for safety we limit this to 1MB
final int len = (int) Math.min(1024 * 1024, size); // for safety we limit this to 1MB
fileHash.grow(len);
fileHash.setLength(len);
final int readBytes = Streams.readFully(in, fileHash.bytes(), 0, len);
@ -853,38 +835,38 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
/**
* Returns a diff between the two snapshots that can be used for recovery. The given snapshot is treated as the
* recovery target and this snapshot as the source. The returned diff will hold a list of files that are:
* <ul>
* <li>identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered</li>
* <li>different: they exist in both snapshots but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
* <ul>
* <li>identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered</li>
* <li>different: they exist in both snapshots but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
* This method groups file into per-segment files and per-commit files. A file is treated as
* identical if and on if all files in it's group are identical. On a per-segment level files for a segment are treated
* as identical iff:
* <ul>
* <li>all files in this segment have the same checksum</li>
* <li>all files in this segment have the same length</li>
* <li>the segments <tt>.si</tt> files hashes are byte-identical Note: This is a using a perfect hash function, The metadata transfers the <tt>.si</tt> file content as it's hash</li>
* <li>all files in this segment have the same checksum</li>
* <li>all files in this segment have the same length</li>
* <li>the segments <tt>.si</tt> files hashes are byte-identical Note: This is a using a perfect hash function, The metadata transfers the <tt>.si</tt> file content as it's hash</li>
* </ul>
*
* <p/>
* The <tt>.si</tt> file contains a lot of diagnostics including a timestamp etc. in the future there might be
* unique segment identifiers in there hardening this method further.
*
* <p/>
* The per-commit files handles very similar. A commit is composed of the <tt>segments_N</tt> files as well as generational files like
* deletes (<tt>_x_y.del</tt>) or field-info (<tt>_x_y.fnm</tt>) files. On a per-commit level files for a commit are treated
* as identical iff:
* <ul>
* <li>all files belonging to this commit have the same checksum</li>
* <li>all files belonging to this commit have the same length</li>
* <li>the segments file <tt>segments_N</tt> files hashes are byte-identical Note: This is a using a perfect hash function, The metadata transfers the <tt>segments_N</tt> file content as it's hash</li>
* <li>all files belonging to this commit have the same checksum</li>
* <li>all files belonging to this commit have the same length</li>
* <li>the segments file <tt>segments_N</tt> files hashes are byte-identical Note: This is a using a perfect hash function, The metadata transfers the <tt>segments_N</tt> file content as it's hash</li>
* </ul>
*
* <p/>
* NOTE: this diff will not contain the <tt>segments.gen</tt> file. This file is omitted on recovery.
*/
public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
final ImmutableList.Builder<StoreFileMetaData> identical = ImmutableList.builder();
final ImmutableList.Builder<StoreFileMetaData> different = ImmutableList.builder();
final ImmutableList.Builder<StoreFileMetaData> missing = ImmutableList.builder();
final ImmutableList.Builder<StoreFileMetaData> identical = ImmutableList.builder();
final ImmutableList.Builder<StoreFileMetaData> different = ImmutableList.builder();
final ImmutableList.Builder<StoreFileMetaData> missing = ImmutableList.builder();
final Map<String, List<StoreFileMetaData>> perSegment = new HashMap<>();
final List<StoreFileMetaData> perCommitStoreFiles = new ArrayList<>();
@ -896,7 +878,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
final String extension = IndexFileNames.getExtension(meta.name());
assert FIELD_INFOS_FILE_EXTENSION.equals(extension) == false || IndexFileNames.stripExtension(IndexFileNames.stripSegmentName(meta.name())).isEmpty() : "FieldInfos are generational but updateable DV are not supported in elasticsearch";
if (IndexFileNames.SEGMENTS.equals(segmentId) || DEL_FILE_EXTENSION.equals(extension) || LIV_FILE_EXTENSION.equals(extension)) {
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
perCommitStoreFiles.add(meta);
} else {
List<StoreFileMetaData> perSegStoreFiles = perSegment.get(segmentId);
@ -931,8 +913,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
}
RecoveryDiff recoveryDiff = new RecoveryDiff(identical.build(), different.build(), missing.build());
assert recoveryDiff.size() == this.metadata.size() - (metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) ? 1: 0)
: "some files are missing recoveryDiff size: [" + recoveryDiff.size() + "] metadata size: [" + this.metadata.size() + "] contains segments.gen: [" + metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) + "]" ;
assert recoveryDiff.size() == this.metadata.size() - (metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) ? 1 : 0)
: "some files are missing recoveryDiff size: [" + recoveryDiff.size() + "] metadata size: [" + this.metadata.size() + "] contains segments.gen: [" + metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) + "]";
return recoveryDiff;
}
@ -978,11 +960,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
/**
* A class representing the diff between a recovery source and recovery target
*
* @see MetadataSnapshot#recoveryDiff(org.elasticsearch.index.store.Store.MetadataSnapshot)
*/
public static final class RecoveryDiff {
/**
* Files that exist in both snapshots and they can be considered the same ie. they don't need to be recovered
* Files that exist in both snapshots and they can be considered the same ie. they don't need to be recovered
*/
public final List<StoreFileMetaData> identical;
/**
@ -1122,7 +1105,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public void writeBytes(byte[] b, int offset, int length) throws IOException {
if (writtenBytes + length > checksumPosition && actualChecksum == null) {
assert writtenBytes <= checksumPosition;
final int bytesToWrite = (int)(checksumPosition-writtenBytes);
final int bytesToWrite = (int) (checksumPosition - writtenBytes);
out.writeBytes(b, offset, bytesToWrite);
readAndCompareChecksum();
offset += bytesToWrite;
@ -1137,7 +1120,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
/**
* Index input that calculates checksum as data is read from the input.
*
* <p/>
* This class supports random access (it is possible to seek backward and forward) in order to accommodate retry
* mechanism that is used in some repository plugins (S3 for example). However, the checksum is only calculated on
* the first read. All consecutive reads of the same data are not used to calculate the checksum.
@ -1183,7 +1166,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
input.readBytes(b, offset, len);
if (pos + len > verifiedPosition) {
// Conversion to int is safe here because (verifiedPosition - pos) can be at most len, which is integer
int alreadyVerified = (int)Math.max(0, verifiedPosition - pos);
int alreadyVerified = (int) Math.max(0, verifiedPosition - pos);
if (pos < checksumPosition) {
if (pos + len < checksumPosition) {
digest.update(b, offset + alreadyVerified, len - alreadyVerified);
@ -1285,7 +1268,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
ensureOpen();
if (!isMarkedCorrupted()) {
String uuid = CORRUPTED + Strings.randomBase64UUID();
try(IndexOutput output = this.directory().createOutput(uuid, IOContext.DEFAULT)) {
try (IndexOutput output = this.directory().createOutput(uuid, IOContext.DEFAULT)) {
CodecUtil.writeHeader(output, CODEC, VERSION);
output.writeString(ExceptionsHelper.detailedMessage(exception, true, 0)); // handles null exception
output.writeString(ExceptionsHelper.stackTrace(exception));
@ -1296,17 +1279,4 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
directory().sync(Collections.singleton(uuid));
}
}
/**
* A listener that is called once this store is closed and all references are released
*/
public static interface OnCloseListener {
/**
* Called once the store is closed and all references are released.
*
* @param shardId the shard ID the calling store belongs to.
*/
public void onClose(ShardId shardId);
}
}

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.*;
import org.elasticsearch.index.aliases.IndexAliasesServiceModule;
import org.elasticsearch.index.analysis.AnalysisModule;
@ -133,23 +132,11 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
@Override
public void run() {
try {
removeIndex(index, "shutdown", false, new IndexCloseListener() {
@Override
public void onAllShardsClosed(Index index, List<Throwable> failures) {
latch.countDown();
}
@Override
public void onShardClosed(ShardId shardId) {
}
@Override
public void onShardCloseFailed(ShardId shardId, Throwable t) {
}
});
removeIndex(index, "shutdown", false);
} catch (Throwable e) {
latch.countDown();
logger.warn("failed to delete index on stop [" + index + "]", e);
} finally {
latch.countDown();
}
}
});
@ -344,7 +331,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
* @param reason the high level reason causing this removal
*/
public void removeIndex(String index, String reason) throws ElasticsearchException {
removeIndex(index, reason, false, null);
removeIndex(index, reason, false);
}
/**
@ -357,40 +344,10 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
* @param reason the high level reason causing this delete
*/
public void deleteIndex(String index, String reason) throws ElasticsearchException {
removeIndex(index, reason, true, new IndexCloseListener() {
@Override
public void onAllShardsClosed(Index index, List<Throwable> failures) {
try {
nodeEnv.deleteIndexDirectorySafe(index);
logger.debug("deleted index [{}] from filesystem - failures {}", index, failures);
} catch (Exception e) {
for (Throwable t : failures) {
e.addSuppressed(t);
}
logger.debug("failed to deleted index [{}] from filesystem", e, index);
// ignore - still some shards locked here
}
}
@Override
public void onShardClosed(ShardId shardId) {
try {
// this is called under the shard lock - we can safely delete it
IOUtils.rm(nodeEnv.shardPaths(shardId));
logger.debug("deleted shard [{}] from filesystem", shardId);
} catch (IOException e) {
logger.warn("Can't delete shard {} ", e, shardId);
}
}
@Override
public void onShardCloseFailed(ShardId shardId, Throwable t) {
}
});
removeIndex(index, reason, true);
}
private void removeIndex(String index, String reason, boolean delete, @Nullable IndexCloseListener listener) throws ElasticsearchException {
private void removeIndex(String index, String reason, boolean delete) throws ElasticsearchException {
try {
final IndexService indexService;
final Injector indexInjector;
@ -418,7 +375,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}));
logger.debug("[{}] closing index service (reason [{}])", index, reason);
((IndexService) indexService).close(reason, listener);
indexService.close(reason);
logger.debug("[{}] closing index cache (reason [{}])", index, reason);
indexInjector.getInstance(IndexCache.class).close();
@ -468,33 +425,4 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}
}
}
/**
* A listener interface that can be used to get notification once a shard or all shards
* of an certain index that are allocated on a node are actually closed. The listener methods
* are invoked once the actual low level instance modifying or reading a shard are closed in contrast to
* removal methods that might return earlier.
*/
public static interface IndexCloseListener {
/**
* Invoked once all shards are closed or their closing failed.
* @param index the index that got closed
* @param failures the recorded shard closing failures
*/
public void onAllShardsClosed(Index index, List<Throwable> failures);
/**
* Invoked once the last resource using the given shard ID is released.
* Yet, this method is called while still holding the shards lock such that
* operations on the shards data can safely be executed in this callback.
*/
public void onShardClosed(ShardId shardId);
/**
* Invoked if closing the given shard failed.
*/
public void onShardCloseFailed(ShardId shardId, Throwable t);
}
}

View File

@ -23,6 +23,7 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchTestCase;
@ -37,6 +38,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class NodeEnvironmentTests extends ElasticsearchTestCase {
@ -99,7 +101,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
}
try {
env.lockAllForIndex(new Index("foo"));
env.lockAllForIndex(new Index("foo"), randomIntBetween(0, 10));
fail("shard 1 is locked");
} catch (LockObtainFailedException ex) {
// expected
@ -109,7 +111,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
// can lock again?
env.shardLock(new ShardId("foo", 1)).close();
List<ShardLock> locks = env.lockAllForIndex(new Index("foo"));
List<ShardLock> locks = env.lockAllForIndex(new Index("foo"), randomIntBetween(0, 10));
try {
env.shardLock(new ShardId("foo", randomBoolean() ? 1 : 2));
fail("shard is locked");
@ -172,7 +174,7 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
}
try {
env.deleteIndexDirectorySafe(new Index("foo"));
env.deleteIndexDirectorySafe(new Index("foo"), randomIntBetween(0, 10));
fail("shard is locked");
} catch (LockObtainFailedException ex) {
// expected
@ -183,7 +185,27 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
assertTrue(Files.exists(path));
}
env.deleteIndexDirectorySafe(new Index("foo"));
final AtomicReference<Throwable> threadException = new AtomicReference<>();
if (randomBoolean()) {
Thread t = new Thread(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
threadException.set(t);
}
@Override
protected void doRun() throws Exception {
try (ShardLock fooLock = env.shardLock(new ShardId("foo", 1))) {
Thread.sleep(100);
}
}
});
t.start();
}
env.deleteIndexDirectorySafe(new Index("foo"), 5000);
assertNull(threadException.get());
for (Path path : env.indexPaths(new Index("foo"))) {
assertFalse(Files.exists(path));

View File

@ -19,15 +19,19 @@
package org.elasticsearch.index.store;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.codecs.*;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.SegmentInfoFormat;
import org.apache.lucene.codecs.lucene50.Lucene50Codec;
import org.apache.lucene.codecs.lucene50.Lucene50SegmentInfoFormat;
import org.apache.lucene.document.*;
import org.apache.lucene.index.*;
import org.apache.lucene.store.*;
import org.apache.lucene.util.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.distributor.Distributor;
@ -35,6 +39,7 @@ import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.store.distributor.RandomWeightedDistributor;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.io.FileNotFoundException;
@ -71,13 +76,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
store.incRef();
final AtomicBoolean called = new AtomicBoolean(false);
Store.OnCloseListener listener = new Store.OnCloseListener() {
@Override
public void onClose(ShardId shardId) {
assertTrue(called.compareAndSet(false, true));
}
};
store.close(listener);
store.close();
for (int i = 0; i < incs; i++) {
if (randomBoolean()) {
store.incRef();
@ -92,9 +91,8 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
store.ensureOpen();
}
assertFalse(called.get());
store.decRef();
assertTrue(called.get());
assertThat(store.refCount(), Matchers.equalTo(0));
assertFalse(store.tryIncRef());
try {
store.incRef();
@ -110,27 +108,6 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
}
}
@Test
public void testListenerCanThrowException() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
final ShardLock shardLock = new DummyShardLock(shardId);
Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(directoryService), shardLock);
final AtomicBoolean called = new AtomicBoolean(false);
Store.OnCloseListener listener = new Store.OnCloseListener() {
@Override
public void onClose(ShardId shardId) {
assertTrue(called.compareAndSet(false, true));
throw new RuntimeException("foobar");
}
};
assertTrue(shardLock.isOpen());
store.close(listener);
assertTrue(called.get());
assertFalse(shardLock.isOpen());
// test will barf if the directory is not closed
}
@Test
public void testVerifyingIndexOutput() throws IOException {
Directory dir = newDirectory();