Pass index settings where appropriate in IndicesLifecycle

This allows plugins to be able to perform some needed setup before and
after an index/shard is in use.
This commit is contained in:
Lee Hinman 2015-01-11 21:49:35 +01:00
parent 6b24921bd4
commit a3972f03c6
11 changed files with 85 additions and 56 deletions

View File

@ -41,7 +41,6 @@ import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCacheModule;
import org.elasticsearch.index.cache.filter.ShardFilterCacheModule; import org.elasticsearch.index.cache.filter.ShardFilterCacheModule;
import org.elasticsearch.index.cache.query.ShardQueryCacheModule; import org.elasticsearch.index.cache.query.ShardQueryCacheModule;
import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule; import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.ShardFieldDataModule; import org.elasticsearch.index.fielddata.ShardFieldDataModule;
import org.elasticsearch.index.gateway.IndexShardGatewayModule; import org.elasticsearch.index.gateway.IndexShardGatewayModule;
@ -295,7 +294,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
throw new IndexShardAlreadyExistsException(shardId + " already exists"); throw new IndexShardAlreadyExistsException(shardId + " already exists");
} }
indicesLifecycle.beforeIndexShardCreated(shardId); indicesLifecycle.beforeIndexShardCreated(shardId, indexSettings);
logger.debug("creating shard_id {}", shardId); logger.debug("creating shard_id {}", shardId);
@ -368,7 +367,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
private void closeShardInjector(String reason, ShardId sId, Injector shardInjector, IndexShard indexShard) { private void closeShardInjector(String reason, ShardId sId, Injector shardInjector, IndexShard indexShard) {
final int shardId = sId.id(); final int shardId = sId.id();
try { try {
indicesLifecycle.beforeIndexShardClosed(sId, indexShard); indicesLifecycle.beforeIndexShardClosed(sId, indexShard, indexSettings);
for (Class<? extends Closeable> closeable : pluginsService.shardServices()) { for (Class<? extends Closeable> closeable : pluginsService.shardServices()) {
try { try {
shardInjector.getInstance(closeable).close(); shardInjector.getInstance(closeable).close();
@ -396,7 +395,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
PercolatorQueriesRegistry.class); PercolatorQueriesRegistry.class);
// call this before we close the store, so we can release resources for it // call this before we close the store, so we can release resources for it
indicesLifecycle.afterIndexShardClosed(sId, indexShard); indicesLifecycle.afterIndexShardClosed(sId, indexShard, indexSettings);
} finally { } finally {
try { try {
shardInjector.getInstance(Store.class).close(); shardInjector.getInstance(Store.class).close();

View File

@ -26,6 +26,8 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.common.settings.Settings;
/** /**
* A global component allowing to register for lifecycle of an index (create/closed) and * A global component allowing to register for lifecycle of an index (create/closed) and
@ -62,7 +64,7 @@ public interface IndicesLifecycle {
/** /**
* Called before the index gets created. * Called before the index gets created.
*/ */
public void beforeIndexCreated(Index index) { public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) {
} }
@ -76,7 +78,7 @@ public interface IndicesLifecycle {
/** /**
* Called before the index shard gets created. * Called before the index shard gets created.
*/ */
public void beforeIndexShardCreated(ShardId shardId) { public void beforeIndexShardCreated(ShardId shardId, @IndexSettings Settings indexSettings) {
} }
@ -112,7 +114,7 @@ public interface IndicesLifecycle {
* *
* @param index The index * @param index The index
*/ */
public void afterIndexClosed(Index index) { public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) {
} }
@ -121,7 +123,8 @@ public interface IndicesLifecycle {
* *
* @param indexShard The index shard * @param indexShard The index shard
*/ */
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) { public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
@IndexSettings Settings indexSettings) {
} }
@ -130,7 +133,8 @@ public interface IndicesLifecycle {
* *
* @param shardId The shard id * @param shardId The shard id
*/ */
public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) { public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
@IndexSettings Settings indexSettings) {
} }
@ -149,12 +153,12 @@ public interface IndicesLifecycle {
/** /**
* Called after the index has been deleted. * Called after the index has been deleted.
* This listener method is invoked after {@link #afterIndexClosed(org.elasticsearch.index.Index)} * This listener method is invoked after {@link #afterIndexClosed(org.elasticsearch.index.Index, org.elasticsearch.common.settings.Settings)}
* when an index is deleted * when an index is deleted
* *
* @param index The index * @param index The index
*/ */
public void afterIndexDeleted(Index index) { public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) {
} }

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.*; import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.*; import org.elasticsearch.index.*;
import org.elasticsearch.index.aliases.IndexAliasesServiceModule; import org.elasticsearch.index.aliases.IndexAliasesServiceModule;
import org.elasticsearch.index.analysis.AnalysisModule; import org.elasticsearch.index.analysis.AnalysisModule;
@ -70,8 +69,6 @@ import org.elasticsearch.plugins.PluginsService;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -268,7 +265,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
return indexService; return indexService;
} }
public synchronized IndexService createIndex(String sIndexName, Settings settings, String localNodeId) throws ElasticsearchException { public synchronized IndexService createIndex(String sIndexName, @IndexSettings Settings settings, String localNodeId) throws ElasticsearchException {
if (!lifecycle.started()) { if (!lifecycle.started()) {
throw new ElasticsearchIllegalStateException("Can't create an index [" + sIndexName + "], node is closed"); throw new ElasticsearchIllegalStateException("Can't create an index [" + sIndexName + "], node is closed");
} }
@ -277,7 +274,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
throw new IndexAlreadyExistsException(index); throw new IndexAlreadyExistsException(index);
} }
indicesLifecycle.beforeIndexCreated(index); indicesLifecycle.beforeIndexCreated(index, settings);
logger.debug("creating Index [{}], shards [{}]/[{}]", sIndexName, settings.get(SETTING_NUMBER_OF_SHARDS), settings.get(SETTING_NUMBER_OF_REPLICAS)); logger.debug("creating Index [{}], shards [{}]/[{}]", sIndexName, settings.get(SETTING_NUMBER_OF_SHARDS), settings.get(SETTING_NUMBER_OF_REPLICAS));
@ -391,9 +388,9 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
indexInjector.getInstance(IndexStore.class).close(); indexInjector.getInstance(IndexStore.class).close();
logger.debug("[{}] closed... (reason [{}])", index, reason); logger.debug("[{}] closed... (reason [{}])", index, reason);
indicesLifecycle.afterIndexClosed(indexService.index()); indicesLifecycle.afterIndexClosed(indexService.index(), indexService.settingsService().getSettings());
if (delete) { if (delete) {
indicesLifecycle.afterIndexDeleted(indexService.index()); indicesLifecycle.afterIndexDeleted(indexService.index(), indexService.settingsService().getSettings());
} }
} catch (IOException ex) { } catch (IOException ex) {
throw new ElasticsearchException("failed to remove index " + index, ex); throw new ElasticsearchException("failed to remove index " + index, ex);
@ -410,7 +407,8 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
final FlushStats flushStats = new FlushStats(); final FlushStats flushStats = new FlushStats();
@Override @Override
public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) { public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
@IndexSettings Settings indexSettings) {
if (indexShard != null) { if (indexShard != null) {
getStats.add(indexShard.getStats()); getStats.add(indexShard.getStats());
indexingStats.add(indexShard.indexingStats(), false); indexingStats.add(indexShard.indexingStats(), false);

View File

@ -26,9 +26,10 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -62,10 +63,10 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
} }
} }
public void beforeIndexCreated(Index index) { public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) {
for (Listener listener : listeners) { for (Listener listener : listeners) {
try { try {
listener.beforeIndexCreated(index); listener.beforeIndexCreated(index, indexSettings);
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("[{}] failed to invoke before index created callback", t, index.name()); logger.warn("[{}] failed to invoke before index created callback", t, index.name());
} }
@ -82,10 +83,10 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
} }
} }
public void beforeIndexShardCreated(ShardId shardId) { public void beforeIndexShardCreated(ShardId shardId, @IndexSettings Settings indexSettings) {
for (Listener listener : listeners) { for (Listener listener : listeners) {
try { try {
listener.beforeIndexShardCreated(shardId); listener.beforeIndexShardCreated(shardId, indexSettings);
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("{} failed to invoke before shard created callback", t, shardId); logger.warn("{} failed to invoke before shard created callback", t, shardId);
} }
@ -142,40 +143,42 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
} }
} }
public void afterIndexDeleted(Index index) { public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) {
for (Listener listener : listeners) { for (Listener listener : listeners) {
try { try {
listener.afterIndexDeleted(index); listener.afterIndexDeleted(index, indexSettings);
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("[{}] failed to invoke after index deleted callback", t, index.name()); logger.warn("[{}] failed to invoke after index deleted callback", t, index.name());
} }
} }
} }
public void afterIndexClosed(Index index) { public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) {
for (Listener listener : listeners) { for (Listener listener : listeners) {
try { try {
listener.afterIndexClosed(index); listener.afterIndexClosed(index, indexSettings);
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("[{}] failed to invoke after index closed callback", t, index.name()); logger.warn("[{}] failed to invoke after index closed callback", t, index.name());
} }
} }
} }
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) { public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
@IndexSettings Settings indexSettings) {
for (Listener listener : listeners) { for (Listener listener : listeners) {
try { try {
listener.beforeIndexShardClosed(shardId, indexShard); listener.beforeIndexShardClosed(shardId, indexShard, indexSettings);
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("{} failed to invoke before shard closed callback", t, shardId); logger.warn("{} failed to invoke before shard closed callback", t, shardId);
} }
} }
} }
public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) { public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
@IndexSettings Settings indexSettings) {
for (Listener listener : listeners) { for (Listener listener : listeners) {
try { try {
listener.afterIndexShardClosed(shardId, indexShard); listener.afterIndexShardClosed(shardId, indexShard, indexSettings);
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("{} failed to invoke after shard closed callback", t, shardId); logger.warn("{} failed to invoke after shard closed callback", t, shardId);
} }

View File

@ -28,10 +28,10 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -71,7 +71,8 @@ public class RecoverySource extends AbstractComponent {
this.clusterService = clusterService; this.clusterService = clusterService;
this.indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() { this.indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() {
@Override @Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) { public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
@IndexSettings Settings indexSettings) {
if (indexShard != null) { if (indexShard != null) {
ongoingRecoveries.cancel(indexShard, "shard is closed"); ongoingRecoveries.cancel(indexShard, "shard is closed");
} }

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.*; import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.store.StoreFileMetaData;
@ -99,7 +100,8 @@ public class RecoveryTarget extends AbstractComponent {
indicesLifecycle.addListener(new IndicesLifecycle.Listener() { indicesLifecycle.addListener(new IndicesLifecycle.Listener() {
@Override @Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) { public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
@IndexSettings Settings indexSettings) {
if (indexShard != null) { if (indexShard != null) {
onGoingRecoveries.cancelRecoveriesForShard(shardId, "shard closed"); onGoingRecoveries.cancelRecoveriesForShard(shardId, "shard closed");
} }

View File

@ -24,7 +24,6 @@ import com.carrotsearch.hppc.ObjectSet;
import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.NumericDocValues;
@ -52,6 +51,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.FieldDataType; import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexFieldData;
@ -62,14 +62,14 @@ import org.elasticsearch.index.mapper.FieldMapper.Loading;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.TemplateQueryParser; import org.elasticsearch.index.query.TemplateQueryParser;
import org.elasticsearch.index.search.stats.StatsGroupsParseElement; import org.elasticsearch.index.search.stats.StatsGroupsParseElement;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.IndicesWarmer.WarmerContext;
import org.elasticsearch.indices.IndicesWarmer.TerminationHandle; import org.elasticsearch.indices.IndicesWarmer.TerminationHandle;
import org.elasticsearch.indices.IndicesWarmer.WarmerContext;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.dfs.CachedDfSource; import org.elasticsearch.search.dfs.CachedDfSource;
@ -150,7 +150,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() { indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() {
@Override @Override
public void afterIndexDeleted(Index index) { public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) {
// once an index is closed we can just clean up all the pending search context information // once an index is closed we can just clean up all the pending search context information
// to release memory and let references to the filesystem go etc. // to release memory and let references to the filesystem go etc.
freeAllContextForIndex(index); freeAllContextForIndex(index);

View File

@ -23,7 +23,6 @@ import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFileNames;
@ -53,10 +52,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.merge.policy.MergePolicyModule; import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardException; import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
@ -76,11 +76,7 @@ import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream; import java.nio.file.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.*; import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -184,7 +180,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
final CopyOnWriteArrayList<Throwable> exception = new CopyOnWriteArrayList<>(); final CopyOnWriteArrayList<Throwable> exception = new CopyOnWriteArrayList<>();
final IndicesLifecycle.Listener listener = new IndicesLifecycle.Listener() { final IndicesLifecycle.Listener listener = new IndicesLifecycle.Listener() {
@Override @Override
public void afterIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard) { public void afterIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard, @IndexSettings Settings indexSettings) {
if (indexShard != null) { if (indexShard != null) {
Store store = ((IndexShard) indexShard).store(); Store store = ((IndexShard) indexShard).store();
store.incRef(); store.incRef();

View File

@ -18,8 +18,10 @@
*/ */
package org.elasticsearch.indices; package org.elasticsearch.indices;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.test.ElasticsearchSingleNodeTest; import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.junit.Test; import org.junit.Test;
@ -45,7 +47,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ElasticsearchSingle
ensureGreen(); ensureGreen();
getInstanceFromNode(IndicesLifecycle.class).addListener(new IndicesLifecycle.Listener() { getInstanceFromNode(IndicesLifecycle.class).addListener(new IndicesLifecycle.Listener() {
@Override @Override
public void afterIndexClosed(Index index) { public void afterIndexClosed(Index index, @IndexSettings Settings indexSettings) {
assertEquals(counter.get(), 3); assertEquals(counter.get(), 3);
counter.incrementAndGet(); counter.incrementAndGet();
} }
@ -57,7 +59,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ElasticsearchSingle
} }
@Override @Override
public void afterIndexDeleted(Index index) { public void afterIndexDeleted(Index index, @IndexSettings Settings indexSettings) {
assertEquals(counter.get(), 4); assertEquals(counter.get(), 4);
counter.incrementAndGet(); counter.incrementAndGet();
} }

View File

@ -22,9 +22,13 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test; import org.junit.Test;
@ -42,6 +46,7 @@ import static org.elasticsearch.index.shard.IndexShardState.*;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.CoreMatchers.equalTo;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0) @ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest { public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest {
@ -59,6 +64,8 @@ public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest
assertAcked(client().admin().indices().prepareCreate("test") assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(SETTING_NUMBER_OF_SHARDS, 6, SETTING_NUMBER_OF_REPLICAS, 0)); .setSettings(SETTING_NUMBER_OF_SHARDS, 6, SETTING_NUMBER_OF_REPLICAS, 0));
ensureGreen(); ensureGreen();
assertThat(stateChangeListenerNode1.creationSettings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1), equalTo(6));
assertThat(stateChangeListenerNode1.creationSettings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1), equalTo(0));
//new shards got started //new shards got started
assertShardStatesMatch(stateChangeListenerNode1, 6, CREATED, RECOVERING, POST_RECOVERY, STARTED); assertShardStatesMatch(stateChangeListenerNode1, 6, CREATED, RECOVERING, POST_RECOVERY, STARTED);
@ -97,6 +104,9 @@ public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest
//close the index //close the index
assertAcked(client().admin().indices().prepareClose("test")); assertAcked(client().admin().indices().prepareClose("test"));
assertThat(stateChangeListenerNode1.afterCloseSettings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1), equalTo(6));
assertThat(stateChangeListenerNode1.afterCloseSettings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1), equalTo(0));
assertShardStatesMatch(stateChangeListenerNode1, 6, CLOSED); assertShardStatesMatch(stateChangeListenerNode1, 6, CLOSED);
assertShardStatesMatch(stateChangeListenerNode2, 6, CLOSED); assertShardStatesMatch(stateChangeListenerNode2, 6, CLOSED);
} }
@ -135,6 +145,8 @@ public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest
private static class IndexShardStateChangeListener extends IndicesLifecycle.Listener { private static class IndexShardStateChangeListener extends IndicesLifecycle.Listener {
//we keep track of all the states (ordered) a shard goes through //we keep track of all the states (ordered) a shard goes through
final ConcurrentMap<ShardId, List<IndexShardState>> shardStates = Maps.newConcurrentMap(); final ConcurrentMap<ShardId, List<IndexShardState>> shardStates = Maps.newConcurrentMap();
Settings creationSettings = ImmutableSettings.EMPTY;
Settings afterCloseSettings = ImmutableSettings.EMPTY;
@Override @Override
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState newState, @Nullable String reason) { public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState newState, @Nullable String reason) {
@ -145,6 +157,16 @@ public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest
} }
} }
@Override
public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) {
this.creationSettings = indexSettings;
}
@Override
public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, @IndexSettings Settings indexSettings) {
this.afterCloseSettings = indexSettings;
}
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();

View File

@ -77,7 +77,8 @@ public class MockFSDirectoryService extends FsDirectoryService {
boolean canRun = false; boolean canRun = false;
@Override @Override
public void beforeIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard) { public void beforeIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard,
@IndexSettings Settings indexSettings) {
if (indexShard != null && shardId.equals(sid)) { if (indexShard != null && shardId.equals(sid)) {
logger.info("Shard state before potentially flushing is {}", indexShard.state()); logger.info("Shard state before potentially flushing is {}", indexShard.state());
if (validCheckIndexStates.contains(indexShard.state())) { if (validCheckIndexStates.contains(indexShard.state())) {
@ -85,17 +86,18 @@ public class MockFSDirectoryService extends FsDirectoryService {
// When the the internal engine closes we do a rollback, which removes uncommitted segments // When the the internal engine closes we do a rollback, which removes uncommitted segments
// By doing a commit flush we perform a Lucene commit, but don't clear the translog, // By doing a commit flush we perform a Lucene commit, but don't clear the translog,
// so that even in tests where don't flush we can check the integrity of the Lucene index // so that even in tests where don't flush we can check the integrity of the Lucene index
((IndexShard)indexShard).engine().flush(Engine.FlushType.COMMIT, false, true); // Keep translog for tests that rely on replaying it indexShard.engine().flush(Engine.FlushType.COMMIT, false, true); // Keep translog for tests that rely on replaying it
logger.info("flush finished in beforeIndexShardClosed"); logger.info("flush finished in beforeIndexShardClosed");
} }
} }
} }
@Override @Override
public void afterIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard) { public void afterIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard,
@IndexSettings Settings indexSettings) {
if (shardId.equals(sid) && indexShard != null && canRun) { if (shardId.equals(sid) && indexShard != null && canRun) {
assert indexShard.state() == IndexShardState.CLOSED : "Current state must be closed"; assert indexShard.state() == IndexShardState.CLOSED : "Current state must be closed";
checkIndex(((IndexShard) indexShard).store(), sid); checkIndex(indexShard.store(), sid);
} }
service.indicesLifecycle().removeListener(this); service.indicesLifecycle().removeListener(this);
} }