[CORE] Delete shard content under lock

Once we delete the the index on a node we are closing all resources
and subsequently need to delete all shards contents from disk. Yet
this happens today under a lock (the shard lock) that needs to be
acquried in order to execute any operation on the shards data
path. We try to delete all the index meta-data once we acquired
all the shard lock but this operation can run into a timeout which causes
the index to remain on disk. Further, all shard data will be left on
disk if the timeout is reached.

This commit removes all the shards data just before the shard lock
is release as the last operation on a shard that belongs to a deleted
index.
This commit is contained in:
Simon Willnauer 2014-12-29 15:49:35 +01:00
parent f7f99b8dbf
commit 7ec8973fbc
31 changed files with 279 additions and 235 deletions

View File

@ -115,7 +115,7 @@ public class TransportClusterStatsAction extends TransportNodesOperationAction<C
NodeInfo nodeInfo = nodeService.info(false, true, false, true, false, false, true, false, true);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, false, true, true, false, false, true, false, false, false);
List<ShardStats> shardsStats = new ArrayList<>();
for (IndexService indexService : indicesService.indices().values()) {
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
// only report on fully started shards

View File

@ -153,7 +153,7 @@ public class TransportRecoveryAction extends
if (state == null) {
IndexShardGatewayService gatewayService =
indexService.shardInjector(request.shardId().id()).getInstance(IndexShardGatewayService.class);
indexService.shardInjectorSafe(request.shardId().id()).getInstance(IndexShardGatewayService.class);
state = gatewayService.recoveryState();
}

View File

@ -1,30 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.component;
import org.elasticsearch.ElasticsearchException;
/**
*
*/
public interface CloseableComponent {
void close() throws ElasticsearchException;
}

View File

@ -20,11 +20,14 @@
package org.elasticsearch.common.component;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.lease.Releasable;
import java.io.Closeable;
/**
*
*/
public interface LifecycleComponent<T> extends CloseableComponent {
public interface LifecycleComponent<T> extends Releasable {
Lifecycle.State lifecycleState();

View File

@ -230,11 +230,6 @@ public class Injectors {
return keyType;
}
public static void close(Injector injector) {
}
public static void cleanCaches(Injector injector) {
((InjectorImpl) injector).clearCache();
if (injector.getParent() != null) {

View File

@ -182,15 +182,38 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
assert indexSettings != ImmutableSettings.EMPTY;
final Path[] paths = shardPaths(shardId);
logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths);
try (Closeable lock = shardLock(shardId)) {
IOUtils.rm(paths);
if (hasCustomDataPath(indexSettings)) {
Path customLocation = resolveCustomLocation(indexSettings, shardId);
logger.trace("deleting custom shard {} directory [{}]", shardId, customLocation);
IOUtils.rm(customLocation);
}
logger.trace("deleted shard {} directory, paths: [{}]", shardId, paths);
assert FileSystemUtils.exists(paths) == false;
try (ShardLock lock = shardLock(shardId)) {
deleteShardDirectoryUnderLock(lock, indexSettings);
}
}
/**
* Deletes a shard data directory. Note: this method assumes that the shard lock is acquired
*
* @param lock the shards lock
* @throws IOException if an IOException occurs
*/
public void deleteShardDirectoryUnderLock(ShardLock lock, @IndexSettings Settings indexSettings) throws IOException {
assert indexSettings != ImmutableSettings.EMPTY;
final ShardId shardId = lock.getShardId();
assert isShardLocked(shardId) : "shard " + shardId + " is not locked";
final Path[] paths = shardPaths(shardId);
IOUtils.rm(paths);
if (hasCustomDataPath(indexSettings)) {
Path customLocation = resolveCustomLocation(indexSettings, shardId);
logger.trace("deleting custom shard {} directory [{}]", shardId, customLocation);
IOUtils.rm(customLocation);
}
logger.trace("deleted shard {} directory, paths: [{}]", shardId, paths);
assert FileSystemUtils.exists(paths) == false;
}
private boolean isShardLocked(ShardId id) {
try {
shardLock(id, 0).close();
return false;
} catch (IOException ex) {
return true;
}
}

View File

@ -192,10 +192,6 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
return loadState();
}
public boolean isDangling(String index) {
return danglingIndices.containsKey(index);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().disableStatePersistence()) {

View File

@ -19,14 +19,16 @@
package org.elasticsearch.index;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.collect.Iterators;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
@ -79,7 +81,8 @@ import org.elasticsearch.plugins.ShardsPluginsModule;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -122,11 +125,10 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
private final NodeEnvironment nodeEnv;
private volatile ImmutableMap<Integer, Injector> shardsInjectors = ImmutableMap.of();
private volatile ImmutableMap<Integer, IndexShard> shards = ImmutableMap.of();
private volatile ImmutableMap<Integer, Tuple<IndexShard, Injector>> shards = ImmutableMap.of();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean deleted = new AtomicBoolean(false);
@Inject
public IndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv,
@ -163,8 +165,13 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
@Override
public UnmodifiableIterator<IndexShard> iterator() {
return shards.values().iterator();
public Iterator<IndexShard> iterator() {
return Iterators.transform(shards.values().iterator(), new Function<Tuple<IndexShard, Injector>, IndexShard>() {
@Override
public IndexShard apply(Tuple<IndexShard, Injector> input) {
return input.v1();
}
});
}
public boolean hasShard(int shardId) {
@ -176,7 +183,11 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
*/
@Nullable
public IndexShard shard(int shardId) {
return shards.get(shardId);
Tuple<IndexShard, Injector> indexShardInjectorTuple = shards.get(shardId);
if (indexShardInjectorTuple != null) {
return indexShardInjectorTuple.v1();
}
return null;
}
/**
* Return the shard with the provided id, or throw an exception if it doesn't exist.
@ -189,7 +200,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
return indexShard;
}
public ImmutableSet<Integer> shardIds() {
public Set<Integer> shardIds() {
return shards.keySet();
}
@ -237,8 +248,9 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
return aliasesService;
}
public synchronized void close(final String reason) {
public synchronized void close(final String reason, boolean delete) {
if (closed.compareAndSet(false, true)) {
deleted.compareAndSet(false, delete);
final Set<Integer> shardIds = shardIds();
for (final int shardId : shardIds) {
try {
@ -250,23 +262,15 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
}
/**
* Return the shard injector for the provided id, or null if there is no such shard.
*/
@Nullable
public Injector shardInjector(int shardId) throws ElasticsearchException {
return shardsInjectors.get(shardId);
}
/**
* Return the shard injector for the provided id, or throw an exception if there is no such shard.
*/
public Injector shardInjectorSafe(int shardId) throws IndexShardMissingException {
Injector shardInjector = shardInjector(shardId);
if (shardInjector == null) {
Tuple<IndexShard, Injector> tuple = shards.get(shardId);
if (tuple == null) {
throw new IndexShardMissingException(new ShardId(index, shardId));
}
return shardInjector;
return tuple.v2();
}
public String indexUUID() {
@ -282,12 +286,13 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
if (closed.get()) {
throw new ElasticsearchIllegalStateException("Can't create shard [" + index.name() + "][" + sShardId + "], closed");
}
ShardId shardId = new ShardId(index, sShardId);
final ShardId shardId = new ShardId(index, sShardId);
ShardLock lock = null;
boolean success = false;
Injector shardInjector = null;
try {
lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
if (shardsInjectors.containsKey(shardId.id())) {
if (shards.containsKey(shardId.id())) {
throw new IndexShardAlreadyExistsException(shardId + " already exists");
}
@ -301,7 +306,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
modules.add(new ShardIndexingModule());
modules.add(new ShardSearchModule());
modules.add(new ShardGetModule());
modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class), lock));
modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class), lock,
new StoreCloseListener(shardId)));
modules.add(new DeletionPolicyModule(indexSettings));
modules.add(new MergePolicyModule(indexSettings));
modules.add(new MergeSchedulerModule(indexSettings));
@ -316,8 +322,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
modules.add(new ShardTermVectorsModule());
modules.add(new IndexShardSnapshotModule());
modules.add(new SuggestShardModule());
Injector shardInjector;
try {
shardInjector = modules.createChildInjector(injector);
} catch (CreationException e) {
@ -326,13 +330,11 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
throw new IndexShardCreationException(shardId, e);
}
shardsInjectors = newMapBuilder(shardsInjectors).put(shardId.id(), shardInjector).immutableMap();
IndexShard indexShard = shardInjector.getInstance(IndexShard.class);
indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created");
indicesLifecycle.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
shards = newMapBuilder(shards).put(shardId.id(), new Tuple<>(indexShard, shardInjector)).immutableMap();
success = true;
return indexShard;
} catch (IOException ex) {
@ -340,26 +342,34 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(lock);
if (shardInjector != null) {
IndexShard indexShard = shardInjector.getInstance(IndexShard.class);
closeShardInjector("initialization failed", shardId, shardInjector, indexShard);
}
}
}
}
public synchronized void removeShard(int shardId, String reason) throws ElasticsearchException {
final ShardId sId = new ShardId(index, shardId);
try {
final Injector shardInjector;
final IndexShard indexShard;
Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors);
shardInjector = tmpShardInjectors.remove(shardId);
if (shardInjector == null) {
return;
}
final Injector shardInjector;
final IndexShard indexShard;
if (shards.containsKey(shardId) == false) {
return;
}
logger.debug("[{}] closing... (reason: [{}])", shardId, reason);
HashMap<Integer, Tuple<IndexShard, Injector>> tmpShardsMap = newHashMap(shards);
Tuple<IndexShard, Injector> tuple = tmpShardsMap.remove(shardId);
indexShard = tuple.v1();
shardInjector = tuple.v2();
shards = ImmutableMap.copyOf(tmpShardsMap);
closeShardInjector(reason, sId, shardInjector, indexShard);
logger.debug("[{}] closed (reason: [{}])", shardId, reason);
}
logger.debug("[{}] closing... (reason: [{}])", shardId, reason);
shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors);
Map<Integer, IndexShard> tmpShardsMap = newHashMap(shards);
indexShard = tmpShardsMap.remove(shardId);
shards = ImmutableMap.copyOf(tmpShardsMap);
private void closeShardInjector(String reason, ShardId sId, Injector shardInjector, IndexShard indexShard) {
final int shardId = sId.id();
try {
indicesLifecycle.beforeIndexShardClosed(sId, indexShard);
for (Class<? extends Closeable> closeable : pluginsService.shardServices()) {
try {
@ -368,78 +378,76 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
logger.debug("[{}] failed to clean plugin shard service [{}]", e, shardId, closeable);
}
}
try {
// now we can close the translog service, we need to close it before the we close the shard
shardInjector.getInstance(TranslogService.class).close();
} catch (Throwable e) {
logger.debug("[{}] failed to close translog service", e, shardId);
// ignore
}
// now we can close the translog service, we need to close it before the we close the shard
closeInjectorResource(sId, shardInjector, TranslogService.class);
// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
if (indexShard != null) {
try {
((IndexShard) indexShard).close(reason);
indexShard.close(reason);
} catch (Throwable e) {
logger.debug("[{}] failed to close index shard", e, shardId);
// ignore
}
}
try {
shardInjector.getInstance(Engine.class).close();
} catch (Throwable e) {
logger.debug("[{}] failed to close engine", e, shardId);
// ignore
}
try {
shardInjector.getInstance(MergeSchedulerProvider.class).close();
} catch (Throwable e) {
logger.debug("[{}] failed to close merge policy scheduler", e, shardId);
// ignore
}
try {
shardInjector.getInstance(MergePolicyProvider.class).close();
} catch (Throwable e) {
logger.debug("[{}] failed to close merge policy provider", e, shardId);
// ignore
}
try {
shardInjector.getInstance(IndexShardGatewayService.class).close();
} catch (Throwable e) {
logger.debug("[{}] failed to close index shard gateway", e, shardId);
// ignore
}
try {
// now we can close the translog
shardInjector.getInstance(Translog.class).close();
} catch (Throwable e) {
logger.debug("[{}] failed to close translog", e, shardId);
// ignore
}
try {
// now we can close the translog
shardInjector.getInstance(PercolatorQueriesRegistry.class).close();
} catch (Throwable e) {
logger.debug("[{}] failed to close PercolatorQueriesRegistry", e, shardId);
// ignore
}
closeInjectorResource(sId, shardInjector,
Engine.class,
MergeSchedulerProvider.class,
MergePolicyProvider.class,
IndexShardGatewayService.class,
Translog.class,
PercolatorQueriesRegistry.class);
// call this before we close the store, so we can release resources for it
indicesLifecycle.afterIndexShardClosed(sId, indexShard);
// if we delete or have no gateway or the store is not persistent, clean the store...
final Store store = shardInjector.getInstance(Store.class);
// and close it
} finally {
try {
store.close();
shardInjector.getInstance(Store.class).close();
} catch (Throwable e) {
logger.warn("[{}] failed to close store on shard deletion", e, shardId);
logger.warn("[{}] failed to close store on shard removal (reason: [{}])", e, shardId, reason);
}
Injectors.close(injector);
}
}
logger.debug("[{}] closed (reason: [{}])", shardId, reason);
} catch (Throwable t) {
throw t;
/**
* This method gets an instance for each of the given classes passed and calls #close() on the returned instance.
* NOTE: this method swallows all exceptions thrown from the close method of the injector and logs them as debug log
*/
private void closeInjectorResource(ShardId shardId, Injector shardInjector, Class<? extends Closeable>... toClose) {
for (Class<? extends Closeable> closeable : toClose) {
try {
final Closeable instance = shardInjector.getInstance(closeable);
if (instance == null) {
throw new NullPointerException("No instance available for " + closeable.getName());
}
IOUtils.close(instance);
} catch (Throwable t) {
logger.debug("{} failed to close {}", t, shardId, Strings.toUnderscoreCase(closeable.getSimpleName()));
}
}
}
private void onShardClose(ShardLock lock) {
if (deleted.get()) { // we remove that shards content if this index has been deleted
try {
nodeEnv.deleteShardDirectoryUnderLock(lock, indexSettings);
} catch (IOException e) {
logger.warn("{} failed to delete shard content", e, lock.getShardId());
}
}
}
private class StoreCloseListener implements Store.OnClose {
private final ShardId shardId;
public StoreCloseListener(ShardId shardId) {
this.shardId = shardId;
}
@Override
public void handle(ShardLock lock) {
assert lock.getShardId().equals(shardId) : "shard Id mismatch, expected: " + shardId + " but got: " + lock.getShardId();
onShardClose(lock);
}
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@ -35,6 +34,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import java.io.Closeable;
import java.util.Map;
import static com.google.common.collect.Maps.newHashMap;
@ -42,7 +42,7 @@ import static com.google.common.collect.Maps.newHashMap;
/**
*
*/
public class AnalysisService extends AbstractIndexComponent implements CloseableComponent {
public class AnalysisService extends AbstractIndexComponent implements Closeable {
private final ImmutableMap<String, NamedAnalyzer> analyzers;
private final ImmutableMap<String, TokenizerFactory> tokenizers;

View File

@ -19,12 +19,12 @@
package org.elasticsearch.index.cache;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.AbstractIndexComponent;
@ -34,10 +34,13 @@ import org.elasticsearch.index.cache.filter.FilterCache;
import org.elasticsearch.index.cache.query.parser.QueryParserCache;
import org.elasticsearch.index.settings.IndexSettings;
import java.io.Closeable;
import java.io.IOException;
/**
*
*/
public class IndexCache extends AbstractIndexComponent implements CloseableComponent, ClusterStateListener {
public class IndexCache extends AbstractIndexComponent implements Closeable, ClusterStateListener {
private final FilterCache filterCache;
private final QueryParserCache queryParserCache;
@ -77,10 +80,8 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo
}
@Override
public void close() throws ElasticsearchException {
filterCache.close();
queryParserCache.close();
bitsetFilterCache.close();
public void close() throws IOException {
IOUtils.close(filterCache, queryParserCache, bitsetFilterCache);
if (clusterService != null) {
clusterService.remove(this);
}

View File

@ -33,7 +33,6 @@ import org.apache.lucene.util.SparseFixedBitSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.search.NoCacheFilter;
import org.elasticsearch.common.settings.Settings;
@ -52,6 +51,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
@ -68,7 +68,7 @@ import java.util.concurrent.Executor;
* and require that it should always be around should use this cache, otherwise the
* {@link org.elasticsearch.index.cache.filter.FilterCache} should be used instead.
*/
public class BitsetFilterCache extends AbstractIndexComponent implements LeafReader.CoreClosedListener, RemovalListener<Object, Cache<Filter, BitsetFilterCache.Value>>, CloseableComponent {
public class BitsetFilterCache extends AbstractIndexComponent implements LeafReader.CoreClosedListener, RemovalListener<Object, Cache<Filter, BitsetFilterCache.Value>>, Closeable {
public static final String LOAD_RANDOM_ACCESS_FILTERS_EAGERLY = "index.load_fixed_bitset_filters_eagerly";

View File

@ -22,15 +22,16 @@ package org.elasticsearch.index.cache.filter;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.FilterCachingPolicy;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.lucene.HashedBytesRef;
import org.elasticsearch.index.IndexComponent;
import org.elasticsearch.index.IndexService;
import java.io.Closeable;
/**
*
*/
public interface FilterCache extends IndexComponent, CloseableComponent {
public interface FilterCache extends IndexComponent, Closeable {
static class EntriesStats {
public final long sizeInBytes;

View File

@ -21,14 +21,15 @@ package org.elasticsearch.index.cache.query.parser;
import org.apache.lucene.queryparser.classic.QueryParserSettings;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.index.IndexComponent;
import java.io.Closeable;
/**
* The main benefit of the query parser cache is to not parse the same query string on different shards.
* Less about long running query strings.
*/
public interface QueryParserCache extends IndexComponent, CloseableComponent {
public interface QueryParserCache extends IndexComponent, Closeable {
Query get(QueryParserSettings queryString);

View File

@ -29,7 +29,6 @@ import org.apache.lucene.search.join.BitDocIdSetFilter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -42,12 +41,13 @@ import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import java.io.Closeable;
import java.util.List;
/**
*
*/
public interface Engine extends CloseableComponent {
public interface Engine extends Closeable {
static final String INDEX_CODEC = "index.codec";
static ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb");

View File

@ -30,13 +30,14 @@ import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
/**
*
*/
public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent {
public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent, Closeable {
public static interface FailureListener {
void onFailedMerge(MergePolicy.MergeException e);

View File

@ -54,6 +54,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.percolator.PercolatorService;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@ -66,7 +67,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* Once a document type has been created, the real-time percolator will start to listen to write events and update the
* this registry with queries in real time.
*/
public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
public class PercolatorQueriesRegistry extends AbstractIndexShardComponent implements Closeable{
// This is a shard level service, but these below are index level service:
private final IndexQueryParserService queryParserService;

View File

@ -39,7 +39,6 @@ public class ShardId implements Serializable, Streamable, Comparable<ShardId> {
private int hashCode;
private ShardId() {
}
public ShardId(String index, int shardId) {

View File

@ -41,6 +41,7 @@ import org.elasticsearch.common.lucene.Directories;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.env.ShardLock;
@ -92,6 +93,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
private final StoreDirectory directory;
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
private final ShardLock shardLock;
private final OnClose onClose;
private final AbstractRefCounted refCounter = new AbstractRefCounted("store") {
@Override
@ -101,12 +103,18 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
};
@Inject
public Store(ShardId shardId, @IndexSettings Settings indexSettings, DirectoryService directoryService, Distributor distributor, ShardLock shardLock) throws IOException {
this(shardId, indexSettings, directoryService, distributor, shardLock, OnClose.EMPTY);
}
@Inject
public Store(ShardId shardId, @IndexSettings Settings indexSettings, DirectoryService directoryService, Distributor distributor, ShardLock shardLock, OnClose onClose) throws IOException {
super(shardId, indexSettings);
this.directoryService = directoryService;
this.directory = new StoreDirectory(directoryService.newFromDistributor(distributor), Loggers.getLogger("index.store.deletes", indexSettings, shardId));
this.shardLock = shardLock;
this.onClose = onClose;
assert onClose != null;
assert shardLock != null;
assert shardLock.getShardId().equals(shardId);
}
@ -342,7 +350,11 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
private void closeInternal() {
try {
directory.innerClose(); // this closes the distributorDirectory as well
try {
directory.innerClose(); // this closes the distributorDirectory as well
} finally {
onClose.handle(shardLock);
}
} catch (IOException e) {
logger.debug("failed to close directory", e);
} finally {
@ -1278,4 +1290,19 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
directory().sync(Collections.singleton(uuid));
}
}
/**
* A listener that is executed once the store is closed and all references to it are released
*/
public static interface OnClose extends Callback<ShardLock> {
static final OnClose EMPTY = new OnClose() {
/**
* This method is called while the provided {@link org.elasticsearch.env.ShardLock} is held.
* This method is only called once after all resources for a store are released.
*/
@Override
public void handle(ShardLock Lock) {
}
};
}
}

View File

@ -21,7 +21,9 @@ package org.elasticsearch.index.store;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.distributor.Distributor;
import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.store.distributor.RandomWeightedDistributor;
@ -39,13 +41,15 @@ public class StoreModule extends AbstractModule {
private final IndexStore indexStore;
private final ShardLock lock;
private final Store.OnClose closeCallback;
private Class<? extends Distributor> distributor;
public StoreModule(Settings settings, IndexStore indexStore, ShardLock lock) {
public StoreModule(Settings settings, IndexStore indexStore, ShardLock lock, Store.OnClose closeCallback) {
this.indexStore = indexStore;
this.settings = settings;
this.lock = lock;
this.closeCallback = closeCallback;
}
public void setDistributor(Class<? extends Distributor> distributor) {
@ -57,6 +61,8 @@ public class StoreModule extends AbstractModule {
bind(DirectoryService.class).to(indexStore.shardDirectory()).asEagerSingleton();
bind(Store.class).asEagerSingleton();
bind(ShardLock.class).toInstance(lock);
bind(Store.OnClose.class).toInstance(closeCallback);
if (distributor == null) {
distributor = loadDistributor(settings);
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
@ -44,7 +45,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/**
*
*/
public class TranslogService extends AbstractIndexShardComponent {
public class TranslogService extends AbstractIndexShardComponent implements Closeable {
private static final String FLUSH_THRESHOLD_OPS_KEY = "flush_threshold_ops";
private static final String FLUSH_THRESHOLD_SIZE_KEY = "flush_threshold_size";

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings;
@ -71,6 +72,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -97,16 +99,12 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
private final PluginsService pluginsService;
private final NodeEnvironment nodeEnv;
private final Map<String, Injector> indicesInjectors = new HashMap<>();
private volatile ImmutableMap<String, IndexService> indices = ImmutableMap.of();
private volatile Map<String, Tuple<IndexService, Injector>> indices = ImmutableMap.of();
private final OldShardsStats oldShardsStats = new OldShardsStats();
@Inject
public IndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, Injector injector, NodeEnvironment nodeEnv) {
public IndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, Injector injector) {
super(settings);
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indicesAnalysisService = indicesAnalysisService;
@ -115,7 +113,6 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
this.pluginsService = injector.getInstance(PluginsService.class);
this.indicesLifecycle.addListener(oldShardsStats);
this.nodeEnv = nodeEnv;
}
@Override
@ -136,7 +133,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
try {
removeIndex(index, "shutdown", false);
} catch (Throwable e) {
logger.warn("failed to delete index on stop [" + index + "]", e);
logger.warn("failed to remove index on stop [" + index + "]", e);
} finally {
latch.countDown();
}
@ -203,7 +200,8 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}
Map<Index, List<IndexShardStats>> statsByShard = Maps.newHashMap();
for (IndexService indexService : indices.values()) {
for (Tuple<IndexService, Injector> value : indices.values()) {
IndexService indexService = value.v1();
for (IndexShard indexShard : indexService) {
try {
if (indexShard.routingEntry() == null) {
@ -232,32 +230,31 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}
@Override
public UnmodifiableIterator<IndexService> iterator() {
return indices.values().iterator();
public Iterator<IndexService> iterator() {
return Iterators.transform(indices.values().iterator(), new Function<Tuple<IndexService, Injector>, IndexService>() {
@Override
public IndexService apply(Tuple<IndexService, Injector> input) {
return input.v1();
}
});
}
public boolean hasIndex(String index) {
return indices.containsKey(index);
}
/**
* Returns a snapshot of the started indices and the associated {@link IndexService} instances.
*
* The map being returned is not a live view and subsequent calls can return a different view.
*/
public ImmutableMap<String, IndexService> indices() {
return indices;
}
/**
* Returns an IndexService for the specified index if exists otherwise returns <code>null</code>.
*
* Even if the index name appeared in {@link #indices()} <code>null</code> can still be returned as an
* index maybe removed in the meantime, so preferable use the associated {@link IndexService} in order to prevent NPE.
*/
@Nullable
public IndexService indexService(String index) {
return indices.get(index);
Tuple<IndexService, Injector> indexServiceInjectorTuple = indices.get(index);
if (indexServiceInjectorTuple == null) {
return null;
} else {
return indexServiceInjectorTuple.v1();
}
}
/**
@ -276,7 +273,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
throw new ElasticsearchIllegalStateException("Can't create an index [" + sIndexName + "], node is closed");
}
Index index = new Index(sIndexName);
if (indicesInjectors.containsKey(index.name())) {
if (indices.containsKey(index.name())) {
throw new IndexAlreadyExistsException(index);
}
@ -315,13 +312,11 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
throw new IndexCreationException(index, e);
}
indicesInjectors.put(index.name(), indexInjector);
IndexService indexService = indexInjector.getInstance(IndexService.class);
indicesLifecycle.afterIndexCreated(indexService);
indices = newMapBuilder(indices).put(index.name(), indexService).immutableMap();
indices = newMapBuilder(indices).put(index.name(), new Tuple<>(indexService, indexInjector)).immutableMap();
return indexService;
}
@ -354,14 +349,15 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
final IndexService indexService;
final Injector indexInjector;
synchronized (this) {
indexInjector = indicesInjectors.remove(index);
if (indexInjector == null) {
if (indices.containsKey(index) == false) {
return;
}
logger.debug("[{}] closing ... (reason [{}])", index, reason);
Map<String, IndexService> tmpMap = newHashMap(indices);
indexService = tmpMap.remove(index);
Map<String, Tuple<IndexService, Injector>> tmpMap = newHashMap(indices);
Tuple<IndexService, Injector> remove = tmpMap.remove(index);
indexService = remove.v1();
indexInjector = remove.v2();
indices = ImmutableMap.copyOf(tmpMap);
}
@ -377,7 +373,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}));
logger.debug("[{}] closing index service (reason [{}])", index, reason);
indexService.close(reason);
indexService.close(reason, delete);
logger.debug("[{}] closing index cache (reason [{}])", index, reason);
indexInjector.getInstance(IndexCache.class).close();
@ -394,8 +390,6 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
logger.debug("[{}] closing index service (reason [{}])", index, reason);
indexInjector.getInstance(IndexStore.class).close();
Injectors.close(injector);
logger.debug("[{}] closed... (reason [{}])", index, reason);
indicesLifecycle.afterIndexClosed(indexService.index());
if (delete) {

View File

@ -161,9 +161,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// are going to recover them again once state persistence is disabled (no master / not recovered)
// TODO: this feels a bit hacky here, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks?
if (event.state().blocks().disableStatePersistence()) {
for (Map.Entry<String, IndexService> entry : indicesService.indices().entrySet()) {
String index = entry.getKey();
IndexService indexService = entry.getValue();
for (IndexService indexService : indicesService) {
String index = indexService.index().getName();
for (Integer shardId : indexService.shardIds()) {
logger.debug("[{}][{}] removing shard (disabled block persistence)", index, shardId);
try {
@ -220,11 +219,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private void applyCleanedIndices(final ClusterChangedEvent event) {
// handle closed indices, since they are not allocated on a node once they are closed
// so applyDeletedIndices might not take them into account
for (Map.Entry<String, IndexService> entry : indicesService.indices().entrySet()) {
String index = entry.getKey();
for (IndexService indexService : indicesService) {
String index = indexService.index().getName();
IndexMetaData indexMetaData = event.state().metaData().index(index);
if (indexMetaData != null && indexMetaData.state() == IndexMetaData.State.CLOSE) {
IndexService indexService = entry.getValue();
for (Integer shardId : indexService.shardIds()) {
logger.debug("[{}][{}] removing shard (index is closed)", index, shardId);
try {
@ -235,9 +233,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
}
for (Map.Entry<String, IndexService> entry : indicesService.indices().entrySet()) {
String index = entry.getKey();
IndexService indexService = entry.getValue();
for (IndexService indexService : indicesService) {
String index = indexService.index().getName();
if (indexService.shardIds().isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index (no shards allocated)", index);
@ -249,7 +246,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private void applyDeletedIndices(final ClusterChangedEvent event) {
for (final String index : indicesService.indices().keySet()) {
for (IndexService indexService : indicesService) {
final String index = indexService.index().name();
if (!event.state().metaData().hasIndex(index)) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index, no longer part of the metadata", index);

View File

@ -32,7 +32,7 @@ import org.elasticsearch.common.settings.Settings;
*
*
*/
public interface Node extends Releasable{
public interface Node extends Releasable {
/**
* The settings that were used to create the node.

View File

@ -395,8 +395,6 @@ public final class InternalNode implements Node {
injector.getInstance(NodeEnvironment.class).close();
injector.getInstance(PageCacheRecycler.class).close();
Injectors.close(injector);
CachedStreams.clear();
logger.info("closed");

View File

@ -21,6 +21,7 @@ package org.elasticsearch.repositories;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.*;
@ -42,6 +43,7 @@ import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -104,7 +106,7 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
}
@Override
public ClusterState execute(ClusterState currentState) {
public ClusterState execute(ClusterState currentState) throws IOException {
ensureRepositoryNotInUse(currentState, request.name);
// Trying to create the new repository on master to make sure it works
if (!registerRepository(newRepositoryMetaData)) {
@ -345,7 +347,7 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
* @param repositoryMetaData new repository metadata
* @return {@code true} if new repository was added or {@code false} if it was ignored
*/
private boolean registerRepository(RepositoryMetaData repositoryMetaData) {
private boolean registerRepository(RepositoryMetaData repositoryMetaData) throws IOException {
RepositoryHolder previous = repositories.get(repositoryMetaData.name());
if (previous != null) {
if (!previous.type.equals(repositoryMetaData.type()) && previous.settings.equals(repositoryMetaData.settings())) {
@ -370,11 +372,8 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
* @param name repository name
* @param holder repository holder
*/
private void closeRepository(String name, RepositoryHolder holder) {
private void closeRepository(String name, RepositoryHolder holder) throws IOException {
logger.debug("closing repository [{}][{}]", holder.type, name);
if (holder.injector != null) {
Injectors.close(holder.injector);
}
if (holder.repository != null) {
holder.repository.close();
}
@ -398,9 +397,6 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
repository.start();
return new RepositoryHolder(repositoryMetaData.type(), repositoryMetaData.settings(), repositoryInjector, repository, indexShardRepository);
} catch (Throwable t) {
if (repositoryInjector != null) {
Injectors.close(repositoryInjector);
}
logger.warn("failed to create repository [{}][{}]", t, repositoryMetaData.type(), repositoryMetaData.name());
throw new RepositoryException(repositoryMetaData.name(), "failed to create repository", t);
}
@ -460,7 +456,6 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
private final String type;
private final Settings settings;
private final Injector injector;
private final Repository repository;
private final IndexShardRepository indexShardRepository;
@ -469,7 +464,6 @@ public class RepositoriesService extends AbstractComponent implements ClusterSta
this.settings = settings;
this.repository = repository;
this.indexShardRepository = indexShardRepository;
this.injector = injector;
}
}

View File

@ -20,12 +20,13 @@
package org.elasticsearch.rest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.component.CloseableComponent;
import java.io.Closeable;
/**
* A filter allowing to filter rest operations.
*/
public abstract class RestFilter implements CloseableComponent {
public abstract class RestFilter implements Closeable {
/**
* Optionally, the order of the filter. Execution is done from lowest value to highest.

View File

@ -200,8 +200,6 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
}
river.close();
Injectors.close(injector);
}
private class ApplyRivers implements RiverClusterStateListener {

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

View File

@ -32,6 +32,7 @@ import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.distributor.Distributor;
@ -47,6 +48,7 @@ import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Adler32;
import static com.carrotsearch.randomizedtesting.RandomizedTest.*;
@ -1024,4 +1026,28 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
store.deleteContent();
IOUtils.close(store);
}
public void testOnCloseCallback() throws IOException {
final ShardId shardId = new ShardId(new Index(randomRealisticUnicodeOfCodepointLengthBetween(1, 10)), randomIntBetween(0, 100));
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
final AtomicInteger count = new AtomicInteger(0);
final ShardLock lock = new DummyShardLock(shardId);
Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(directoryService), lock , new Store.OnClose() {
@Override
public void handle(ShardLock theLock) {
assertEquals(shardId, theLock.getShardId());
assertEquals(lock, theLock);
count.incrementAndGet();
}
});
assertEquals(count.get(), 0);
final int iters = randomIntBetween(1, 10);
for (int i = 0; i < iters; i++) {
store.close();
}
assertEquals(count.get(), 1);
}
}

View File

@ -58,7 +58,7 @@ public class IndicesLeaksTests extends ElasticsearchIntegrationTest {
IndexService indexService = indicesService.indexServiceSafe("test");
Injector indexInjector = indexService.injector();
IndexShard shard = indexService.shardSafe(0);
Injector shardInjector = indexService.shardInjector(0);
Injector shardInjector = indexService.shardInjectorSafe(0);
performCommonOperations();

View File

@ -308,6 +308,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
* per index basis. Allows to enable/disable the randomization for number of shards and replicas
*/
private void randomIndexTemplate() throws IOException {
// TODO move settings for random directory etc here into the index based randomized settings.
if (cluster().size() > 0) {
ImmutableSettings.Builder randomSettingsBuilder =