cleanup deletion of content in shards

we are very conservative on when we delete data, remove the actual options of deleting data that are not used
This commit is contained in:
Shay Banon 2013-03-04 20:41:19 -08:00
parent 1ed07c1794
commit 3e264f6b95
29 changed files with 100 additions and 164 deletions

View File

@ -365,7 +365,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
logger.warn("[{}] failed to create", e, request.index); logger.warn("[{}] failed to create", e, request.index);
if (indexCreated) { if (indexCreated) {
// Index was already partially created - need to clean up // Index was already partially created - need to clean up
indicesService.deleteIndex(request.index, failureReason != null ? failureReason : "failed to create index"); indicesService.removeIndex(request.index, failureReason != null ? failureReason : "failed to create index");
} }
listener.onFailure(e); listener.onFailure(e);
return currentState; return currentState;

View File

@ -180,7 +180,7 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
} }
} finally { } finally {
for (String index : indicesToClose) { for (String index : indicesToClose) {
indicesService.cleanIndex(index, "created for alias processing"); indicesService.removeIndex(index, "created for alias processing");
} }
} }
} }

View File

@ -148,7 +148,7 @@ public class MetaDataMappingService extends AbstractComponent {
return currentState; return currentState;
} finally { } finally {
if (createdIndex) { if (createdIndex) {
indicesService.cleanIndex(index, "created for mapping processing"); indicesService.removeIndex(index, "created for mapping processing");
} }
} }
} }
@ -209,7 +209,7 @@ public class MetaDataMappingService extends AbstractComponent {
return currentState; return currentState;
} finally { } finally {
if (createdIndex) { if (createdIndex) {
indicesService.cleanIndex(index, "created for mapping processing"); indicesService.removeIndex(index, "created for mapping processing");
} }
} }
} }
@ -409,7 +409,7 @@ public class MetaDataMappingService extends AbstractComponent {
return currentState; return currentState;
} finally { } finally {
for (String index : indicesToClose) { for (String index : indicesToClose) {
indicesService.cleanIndex(index, "created for mapping processing"); indicesService.removeIndex(index, "created for mapping processing");
} }
} }
} }

View File

@ -29,9 +29,6 @@ public interface CloseableIndexComponent {
/** /**
* Closes the index component. A boolean indicating if its part of an actual index * Closes the index component. A boolean indicating if its part of an actual index
* deletion or not is passed. * deletion or not is passed.
*
* @param delete <tt>true</tt> if the index is being deleted.
* @throws ElasticSearchException
*/ */
void close(boolean delete) throws ElasticSearchException; void close() throws ElasticSearchException;
} }

View File

@ -311,18 +311,14 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
} }
} }
public synchronized void close(boolean delete) { @Override
public synchronized void close() {
indexSettingsService.removeListener(applySettings); indexSettingsService.removeListener(applySettings);
if (snapshotScheduleFuture != null) { if (snapshotScheduleFuture != null) {
snapshotScheduleFuture.cancel(true); snapshotScheduleFuture.cancel(true);
snapshotScheduleFuture = null; snapshotScheduleFuture = null;
} }
// don't really delete the shard gateway if we are *not* primary, shardGateway.close();
// the primary will close it
if (!indexShard.routingEntry().primary()) {
delete = false;
}
shardGateway.close(delete);
if (snapshotLock != null) { if (snapshotLock != null) {
snapshotLock.release(); snapshotLock.release();
} }

View File

@ -82,9 +82,6 @@ public abstract class BlobStoreIndexGateway extends AbstractIndexComponent imple
} }
@Override @Override
public void close(boolean delete) throws ElasticSearchException { public void close() throws ElasticSearchException {
if (delete) {
blobStore.delete(indexPath);
}
} }
} }

View File

@ -130,10 +130,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} }
@Override @Override
public void close(boolean delete) throws ElasticSearchException { public void close() throws ElasticSearchException {
if (delete) {
blobStore.delete(shardPath);
}
} }
@Override @Override

View File

@ -53,6 +53,6 @@ public class LocalIndexGateway extends AbstractIndexComponent implements IndexGa
} }
@Override @Override
public void close(boolean delete) { public void close() {
} }
} }

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.gateway.local; package org.elasticsearch.index.gateway.local;
import com.google.common.io.Closeables;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentInfos;
@ -234,10 +233,14 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
} }
} catch (Throwable e) { } catch (Throwable e) {
// we failed to recovery, make sure to delete the translog file (and keep the recovering one) // we failed to recovery, make sure to delete the translog file (and keep the recovering one)
indexShard.translog().close(true); indexShard.translog().closeWithDelete();
throw new IndexShardGatewayRecoveryException(shardId, "failed to recover shard", e); throw new IndexShardGatewayRecoveryException(shardId, "failed to recover shard", e);
} finally { } finally {
Closeables.closeQuietly(fs); try {
fs.close();
} catch (IOException e) {
// ignore
}
} }
indexShard.performRecoveryFinalization(true); indexShard.performRecoveryFinalization(true);
@ -277,7 +280,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
} }
@Override @Override
public void close(boolean delete) { public void close() {
if (flushScheduler != null) { if (flushScheduler != null) {
flushScheduler.cancel(false); flushScheduler.cancel(false);
} }

View File

@ -53,6 +53,6 @@ public class NoneIndexGateway extends AbstractIndexComponent implements IndexGat
} }
@Override @Override
public void close(boolean delete) { public void close() {
} }
} }

View File

@ -65,6 +65,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
// in the none case, we simply start the shard // in the none case, we simply start the shard
// clean the store, there should be nothing there... // clean the store, there should be nothing there...
try { try {
logger.info("deleting shard content");
indexShard.store().deleteContent(); indexShard.store().deleteContent();
} catch (IOException e) { } catch (IOException e) {
logger.warn("failed to clean store before starting shard", e); logger.warn("failed to clean store before starting shard", e);
@ -106,7 +107,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
} }
@Override @Override
public void close(boolean delete) { public void close() {
} }
@Override @Override

View File

@ -93,7 +93,7 @@ public class LogByteSizeMergePolicyProvider extends AbstractIndexShardComponent
} }
@Override @Override
public void close(boolean delete) throws ElasticSearchException { public void close() throws ElasticSearchException {
indexSettingsService.removeListener(applySettings); indexSettingsService.removeListener(applySettings);
} }

View File

@ -70,7 +70,7 @@ public class LogDocMergePolicyProvider extends AbstractIndexShardComponent imple
} }
@Override @Override
public void close(boolean delete) throws ElasticSearchException { public void close() throws ElasticSearchException {
indexSettingsService.removeListener(applySettings); indexSettingsService.removeListener(applySettings);
} }

View File

@ -111,7 +111,7 @@ public class TieredMergePolicyProvider extends AbstractIndexShardComponent imple
} }
@Override @Override
public void close(boolean delete) throws ElasticSearchException { public void close() throws ElasticSearchException {
indexSettingsService.removeListener(applySettings); indexSettingsService.removeListener(applySettings);
} }

View File

@ -71,11 +71,6 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard> {
IndexShard createShard(int sShardId) throws ElasticSearchException; IndexShard createShard(int sShardId) throws ElasticSearchException;
/**
* Cleans the shard locally, does not touch the gateway!.
*/
void cleanShard(int shardId, String reason) throws ElasticSearchException;
/** /**
* Removes the shard, does not delete local data or the gateway. * Removes the shard, does not delete local data or the gateway.
*/ */

View File

@ -27,10 +27,8 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.ElasticSearchInterruptedException; import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.*; import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.none.NoneGateway;
import org.elasticsearch.index.*; import org.elasticsearch.index.*;
import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.analysis.AnalysisService;
@ -72,7 +70,6 @@ import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ShardsPluginsModule; import org.elasticsearch.plugins.ShardsPluginsModule;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -255,7 +252,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return indexEngine; return indexEngine;
} }
public void close(final boolean delete, final String reason, @Nullable Executor executor) { public void close(final String reason, @Nullable Executor executor) {
synchronized (this) { synchronized (this) {
closed = true; closed = true;
} }
@ -267,9 +264,9 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
@Override @Override
public void run() { public void run() {
try { try {
deleteShard(shardId, delete, !delete, delete, reason); removeShard(shardId, reason);
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to close shard, delete [{}]", e, delete); logger.warn("failed to close shard", e);
} finally { } finally {
latch.countDown(); latch.countDown();
} }
@ -345,32 +342,17 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return indexShard; return indexShard;
} }
@Override
public synchronized void cleanShard(int shardId, String reason) throws ElasticSearchException {
deleteShard(shardId, true, false, false, reason);
}
@Override @Override
public synchronized void removeShard(int shardId, String reason) throws ElasticSearchException { public synchronized void removeShard(int shardId, String reason) throws ElasticSearchException {
deleteShard(shardId, false, false, false, reason);
}
private void deleteShard(int shardId, boolean delete, boolean snapshotGateway, boolean deleteGateway, String reason) throws ElasticSearchException {
Injector shardInjector; Injector shardInjector;
IndexShard indexShard; IndexShard indexShard;
synchronized (this) { synchronized (this) {
Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors); Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors);
shardInjector = tmpShardInjectors.remove(shardId); shardInjector = tmpShardInjectors.remove(shardId);
if (shardInjector == null) { if (shardInjector == null) {
if (!delete) {
return; return;
} }
throw new IndexShardMissingException(new ShardId(index, shardId));
}
shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors); shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors);
if (delete) {
logger.debug("deleting shard_id [{}]", shardId);
}
Map<Integer, IndexShard> tmpShardsMap = newHashMap(shards); Map<Integer, IndexShard> tmpShardsMap = newHashMap(shards);
indexShard = tmpShardsMap.remove(shardId); indexShard = tmpShardsMap.remove(shardId);
@ -379,11 +361,11 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
ShardId sId = new ShardId(index, shardId); ShardId sId = new ShardId(index, shardId);
indicesLifecycle.beforeIndexShardClosed(sId, indexShard, delete); indicesLifecycle.beforeIndexShardClosed(sId, indexShard);
for (Class<? extends CloseableIndexComponent> closeable : pluginsService.shardServices()) { for (Class<? extends CloseableIndexComponent> closeable : pluginsService.shardServices()) {
try { try {
shardInjector.getInstance(closeable).close(delete); shardInjector.getInstance(closeable).close();
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to clean plugin shard service [{}]", e, closeable); logger.debug("failed to clean plugin shard service [{}]", e, closeable);
} }
@ -415,59 +397,45 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
} }
try { try {
shardInjector.getInstance(MergePolicyProvider.class).close(delete); shardInjector.getInstance(MergePolicyProvider.class).close();
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to close merge policy provider", e); logger.debug("failed to close merge policy provider", e);
// ignore // ignore
} }
try { try {
// now, we can snapshot to the gateway, it will be only the translog
if (snapshotGateway) {
shardInjector.getInstance(IndexShardGatewayService.class).snapshotOnClose(); shardInjector.getInstance(IndexShardGatewayService.class).snapshotOnClose();
}
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to snapshot gateway on close", e); logger.debug("failed to snapshot index shard gateway on close", e);
// ignore // ignore
} }
try { try {
shardInjector.getInstance(IndexShardGatewayService.class).close(deleteGateway); shardInjector.getInstance(IndexShardGatewayService.class).close();
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to close index shard gateway", e); logger.debug("failed to close index shard gateway", e);
// ignore // ignore
} }
try { try {
// now we can close the translog // now we can close the translog
shardInjector.getInstance(Translog.class).close(delete); shardInjector.getInstance(Translog.class).close();
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to close translog", e); logger.debug("failed to close translog", e);
// ignore // ignore
} }
// call this before we close the store, so we can release resources for it // call this before we close the store, so we can release resources for it
indicesLifecycle.afterIndexShardClosed(sId, delete); indicesLifecycle.afterIndexShardClosed(sId);
// if we delete or have no gateway or the store is not persistent, clean the store... // if we delete or have no gateway or the store is not persistent, clean the store...
Store store = shardInjector.getInstance(Store.class); Store store = shardInjector.getInstance(Store.class);
if (delete || indexGateway.type().equals(NoneGateway.TYPE) || !indexStore.persistent()) {
try {
store.fullDelete();
} catch (IOException e) {
logger.warn("failed to clean store on shard deletion", e);
}
}
// and close it // and close it
try { try {
store.close(); store.close();
} catch (IOException e) { } catch (Exception e) {
logger.warn("failed to close store on shard deletion", e); logger.warn("failed to close store on shard deletion", e);
} }
Injectors.close(injector); Injectors.close(injector);
// delete the shard location if needed
if (delete || indexGateway.type().equals(NoneGateway.TYPE)) {
FileSystemUtils.deleteRecursively(nodeEnv.shardLocations(sId));
}
} }
} }

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.lucene.store.BufferedChecksumIndexOutput;
import org.elasticsearch.common.lucene.store.ChecksumIndexOutput; import org.elasticsearch.common.lucene.store.ChecksumIndexOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.CloseableIndexComponent;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -50,7 +51,7 @@ import java.util.zip.Adler32;
/** /**
*/ */
public class Store extends AbstractIndexShardComponent { public class Store extends AbstractIndexShardComponent implements CloseableIndexComponent {
static final String CHECKSUMS_PREFIX = "_checksums-"; static final String CHECKSUMS_PREFIX = "_checksums-";
@ -112,6 +113,9 @@ public class Store extends AbstractIndexShardComponent {
return md; return md;
} }
/**
* Deletes the content of a shard store. Be careful calling this!.
*/
public void deleteContent() throws IOException { public void deleteContent() throws IOException {
String[] files = directory.listAll(); String[] files = directory.listAll();
IOException lastException = null; IOException lastException = null;
@ -137,13 +141,6 @@ public class Store extends AbstractIndexShardComponent {
} }
} }
public void fullDelete() throws IOException {
deleteContent();
for (Directory delegate : directory.delegates()) {
directoryService.fullDelete(delegate);
}
}
public StoreStats stats() throws IOException { public StoreStats stats() throws IOException {
return new StoreStats(Directories.estimateSize(directory), directoryService.throttleTimeInNanos()); return new StoreStats(Directories.estimateSize(directory), directoryService.throttleTimeInNanos());
} }
@ -248,8 +245,12 @@ public class Store extends AbstractIndexShardComponent {
return false; return false;
} }
public void close() throws IOException { public void close() {
try {
directory.close(); directory.close();
} catch (IOException e) {
logger.debug("failed to close directory", e);
}
} }
/** /**

View File

@ -102,7 +102,7 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
} }
@Override @Override
public void close(boolean delete) throws ElasticSearchException { public void close() throws ElasticSearchException {
indexService.settingsService().removeListener(applySettings); indexService.settingsService().removeListener(applySettings);
} }

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.CloseableIndexComponent;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.shard.IndexShardComponent;
@ -38,10 +39,12 @@ import java.io.InputStream;
/** /**
* *
*/ */
public interface Translog extends IndexShardComponent { public interface Translog extends IndexShardComponent, CloseableIndexComponent {
public static final String TRANSLOG_ID_KEY = "translog_id"; public static final String TRANSLOG_ID_KEY = "translog_id";
void closeWithDelete();
/** /**
* Returns the id of the current transaction log. * Returns the id of the current transaction log.
*/ */
@ -122,13 +125,6 @@ public interface Translog extends IndexShardComponent {
void syncOnEachOperation(boolean syncOnEachOperation); void syncOnEachOperation(boolean syncOnEachOperation);
/**
* Closes the transaction log.
* <p/>
* <p>Can only be called by one thread.
*/
void close(boolean delete);
static class Location { static class Location {
public final long translogId; public final long translogId;
public final long translogLocation; public final long translogLocation;

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.translog.fs; package org.elasticsearch.index.translog.fs;
import jsr166y.ThreadLocalRandom; import jsr166y.ThreadLocalRandom;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -118,7 +119,16 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
} }
@Override @Override
public void close(boolean delete) { public void closeWithDelete() {
close(true);
}
@Override
public void close() throws ElasticSearchException {
close(false);
}
private void close(boolean delete) {
if (indexSettingsService != null) { if (indexSettingsService != null) {
indexSettingsService.removeListener(applySettings); indexSettingsService.removeListener(applySettings);
} }

View File

@ -97,9 +97,8 @@ public interface IndicesLifecycle {
* Called before the index get closed. * Called before the index get closed.
* *
* @param indexService The index service * @param indexService The index service
* @param delete Does the index gets closed because of a delete command, or because the node is shutting down
*/ */
public void beforeIndexClosed(IndexService indexService, boolean delete) { public void beforeIndexClosed(IndexService indexService) {
} }
@ -107,9 +106,8 @@ public interface IndicesLifecycle {
* Called after the index has been closed. * Called after the index has been closed.
* *
* @param index The index * @param index The index
* @param delete Does the index gets closed because of a delete command, or because the node is shutting down
*/ */
public void afterIndexClosed(Index index, boolean delete) { public void afterIndexClosed(Index index) {
} }
@ -117,9 +115,8 @@ public interface IndicesLifecycle {
* Called before the index shard gets closed. * Called before the index shard gets closed.
* *
* @param indexShard The index shard * @param indexShard The index shard
* @param delete Does the index shard gets closed because of a delete command, or because the node is shutting down
*/ */
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) { public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) {
} }
@ -127,9 +124,8 @@ public interface IndicesLifecycle {
* Called after the index shard has been closed. * Called after the index shard has been closed.
* *
* @param shardId The shard id * @param shardId The shard id
* @param delete Does the index shard gets closed because of a delete command, or because the node is shutting down
*/ */
public void afterIndexShardClosed(ShardId shardId, boolean delete) { public void afterIndexShardClosed(ShardId shardId) {
} }
} }

View File

@ -55,10 +55,5 @@ public interface IndicesService extends Iterable<IndexService>, LifecycleCompone
IndexService createIndex(String index, Settings settings, String localNodeId) throws ElasticSearchException; IndexService createIndex(String index, Settings settings, String localNodeId) throws ElasticSearchException;
void deleteIndex(String index, String reason) throws ElasticSearchException; void removeIndex(String index, String reason) throws ElasticSearchException;
/**
* Cleans the index without actually deleting any content for it.
*/
void cleanIndex(String index, String reason) throws ElasticSearchException;
} }

View File

@ -87,27 +87,27 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
} }
} }
public void beforeIndexClosed(IndexService indexService, boolean delete) { public void beforeIndexClosed(IndexService indexService) {
for (Listener listener : listeners) { for (Listener listener : listeners) {
listener.beforeIndexClosed(indexService, delete); listener.beforeIndexClosed(indexService);
} }
} }
public void afterIndexClosed(Index index, boolean delete) { public void afterIndexClosed(Index index) {
for (Listener listener : listeners) { for (Listener listener : listeners) {
listener.afterIndexClosed(index, delete); listener.afterIndexClosed(index);
} }
} }
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) { public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) {
for (Listener listener : listeners) { for (Listener listener : listeners) {
listener.beforeIndexShardClosed(shardId, indexShard, delete); listener.beforeIndexShardClosed(shardId, indexShard);
} }
} }
public void afterIndexShardClosed(ShardId shardId, boolean delete) { public void afterIndexShardClosed(ShardId shardId) {
for (Listener listener : listeners) { for (Listener listener : listeners) {
listener.afterIndexShardClosed(shardId, delete); listener.afterIndexShardClosed(shardId);
} }
} }
} }

View File

@ -27,7 +27,6 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.*; import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
@ -148,7 +147,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
@Override @Override
public void run() { public void run() {
try { try {
deleteIndex(index, false, "shutdown", shardsStopExecutor); removeIndex(index, "shutdown", shardsStopExecutor);
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to delete index on stop [" + index + "]", e); logger.warn("failed to delete index on stop [" + index + "]", e);
} finally { } finally {
@ -311,42 +310,31 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
} }
@Override @Override
public synchronized void cleanIndex(String index, String reason) throws ElasticSearchException { public synchronized void removeIndex(String index, String reason) throws ElasticSearchException {
deleteIndex(index, false, reason, null); removeIndex(index, reason, null);
} }
@Override private void removeIndex(String index, String reason, @Nullable Executor executor) throws ElasticSearchException {
public synchronized void deleteIndex(String index, String reason) throws ElasticSearchException {
deleteIndex(index, true, reason, null);
}
private void deleteIndex(String index, boolean delete, String reason, @Nullable Executor executor) throws ElasticSearchException {
Injector indexInjector; Injector indexInjector;
IndexService indexService; IndexService indexService;
synchronized (this) { synchronized (this) {
indexInjector = indicesInjectors.remove(index); indexInjector = indicesInjectors.remove(index);
if (indexInjector == null) { if (indexInjector == null) {
if (!delete) {
return; return;
} }
throw new IndexMissingException(new Index(index));
}
if (delete) {
logger.debug("deleting Index [{}]", index);
}
Map<String, IndexService> tmpMap = newHashMap(indices); Map<String, IndexService> tmpMap = newHashMap(indices);
indexService = tmpMap.remove(index); indexService = tmpMap.remove(index);
indices = ImmutableMap.copyOf(tmpMap); indices = ImmutableMap.copyOf(tmpMap);
} }
indicesLifecycle.beforeIndexClosed(indexService, delete); indicesLifecycle.beforeIndexClosed(indexService);
for (Class<? extends CloseableIndexComponent> closeable : pluginsService.indexServices()) { for (Class<? extends CloseableIndexComponent> closeable : pluginsService.indexServices()) {
indexInjector.getInstance(closeable).close(delete); indexInjector.getInstance(closeable).close();
} }
((InternalIndexService) indexService).close(delete, reason, executor); ((InternalIndexService) indexService).close(reason, executor);
indexInjector.getInstance(PercolatorService.class).close(); indexInjector.getInstance(PercolatorService.class).close();
indexInjector.getInstance(IndexCache.class).close(); indexInjector.getInstance(IndexCache.class).close();
@ -354,19 +342,15 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
indexInjector.getInstance(AnalysisService.class).close(); indexInjector.getInstance(AnalysisService.class).close();
indexInjector.getInstance(IndexEngine.class).close(); indexInjector.getInstance(IndexEngine.class).close();
indexInjector.getInstance(IndexGateway.class).close(delete); indexInjector.getInstance(IndexGateway.class).close();
indexInjector.getInstance(MapperService.class).close(); indexInjector.getInstance(MapperService.class).close();
indexInjector.getInstance(IndexQueryParserService.class).close(); indexInjector.getInstance(IndexQueryParserService.class).close();
indexInjector.getInstance(IndexStore.class).close(delete); indexInjector.getInstance(IndexStore.class).close();
Injectors.close(injector); Injectors.close(injector);
indicesLifecycle.afterIndexClosed(indexService.index(), delete); indicesLifecycle.afterIndexClosed(indexService.index());
if (delete) {
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(index)));
}
} }
static class OldShardsStats extends IndicesLifecycle.Listener { static class OldShardsStats extends IndicesLifecycle.Listener {
@ -379,7 +363,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
final FlushStats flushStats = new FlushStats(); final FlushStats flushStats = new FlushStats();
@Override @Override
public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) { public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) {
if (indexShard != null) { if (indexShard != null) {
getStats.add(indexShard.getStats()); getStats.add(indexShard.getStats());
indexingStats.add(indexShard.indexingStats(), false); indexingStats.add(indexShard.indexingStats(), false);

View File

@ -160,7 +160,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
logger.warn("[{}] failed to remove shard (disabled block persistence)", e, index); logger.warn("[{}] failed to remove shard (disabled block persistence)", e, index);
} }
} }
indicesService.cleanIndex(index, "cleaning index (disabled block persistence)"); indicesService.removeIndex(index, "cleaning index (disabled block persistence)");
} }
return; return;
} }
@ -218,7 +218,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} }
// clean the index // clean the index
try { try {
indicesService.cleanIndex(index, "cleaning index (no shards allocated)"); indicesService.removeIndex(index, "removing index (no shards allocated)");
} catch (Exception e) { } catch (Exception e) {
logger.warn("[{}] failed to clean index (no shards of that index are allocated on this node)", e, index); logger.warn("[{}] failed to clean index (no shards of that index are allocated on this node)", e, index);
} }
@ -233,7 +233,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
logger.debug("[{}] cleaning index, no longer part of the metadata", index); logger.debug("[{}] cleaning index, no longer part of the metadata", index);
} }
try { try {
indicesService.cleanIndex(index, "index no longer part of the metadata"); indicesService.removeIndex(index, "index no longer part of the metadata");
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to clean index", e); logger.warn("failed to clean index", e);
} }

View File

@ -200,7 +200,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
} }
@Override @Override
public void afterIndexShardClosed(ShardId shardId, boolean delete) { public void afterIndexShardClosed(ShardId shardId) {
synchronized (mutex) { synchronized (mutex) {
calcAndSetShardIndexingBuffer("removed_shard[" + shardId.index().name() + "][" + shardId.id() + "]"); calcAndSetShardIndexingBuffer("removed_shard[" + shardId.index().name() + "][" + shardId.id() + "]");
shardsIndicesStatus.remove(shardId); shardsIndicesStatus.remove(shardId);

View File

@ -98,7 +98,7 @@ public class RecoveryTarget extends AbstractComponent {
indicesLifecycle.addListener(new IndicesLifecycle.Listener() { indicesLifecycle.addListener(new IndicesLifecycle.Listener() {
@Override @Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) { public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) {
if (indexShard != null) { if (indexShard != null) {
removeAndCleanOnGoingRecovery(findRecoveryByShard(indexShard)); removeAndCleanOnGoingRecovery(findRecoveryByShard(indexShard));
} }

View File

@ -664,12 +664,12 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
class CleanContextOnIndicesLifecycleListener extends IndicesLifecycle.Listener { class CleanContextOnIndicesLifecycleListener extends IndicesLifecycle.Listener {
@Override @Override
public void beforeIndexClosed(IndexService indexService, boolean delete) { public void beforeIndexClosed(IndexService indexService) {
releaseContextsForIndex(indexService.index()); releaseContextsForIndex(indexService.index());
} }
@Override @Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, boolean delete) { public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) {
releaseContextsForShard(shardId); releaseContextsForShard(shardId);
} }
} }

View File

@ -53,7 +53,7 @@ public abstract class AbstractSimpleTranslogTests {
@AfterMethod @AfterMethod
public void tearDown() { public void tearDown() {
translog.close(true); translog.closeWithDelete();
} }
protected abstract Translog create(); protected abstract Translog create();