From 6ea89004cded88ca4d0ab9cd96c7c44057e47b4f Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 10 Jun 2016 12:47:41 +0200 Subject: [PATCH] Make IndicesClusterStateService unit testable (#17270) Testability of ICSS is achieved by introducing interfaces for IndicesService, IndexService and IndexShard. These interfaces extract all relevant methods used by ICSS (which do not deal directly with store) and give the possibility to easily mock all the store behavior away in the tests (and cuts down on dependencies). --- .../resources/checkstyle_suppressions.xml | 1 - .../cluster/ClusterChangedEvent.java | 13 +- .../index/NodeMappingRefreshAction.java | 8 +- .../org/elasticsearch/index/IndexService.java | 14 +- .../elasticsearch/index/IndexSettings.java | 2 +- .../index/mapper/MapperService.java | 43 + .../elasticsearch/index/shard/IndexShard.java | 119 +-- .../index/shard/ShadowIndexShard.java | 6 +- .../elasticsearch/indices/IndicesService.java | 78 +- .../cluster/IndicesClusterStateService.java | 890 ++++++++++-------- .../TransportMasterNodeActionUtils.java | 38 + .../cluster/ClusterChangedEventTests.java | 14 +- .../RandomAllocationDeciderTests.java | 2 +- .../index/shard/IndexShardTests.java | 56 +- .../IndexingMemoryControllerTests.java | 2 +- ...dicesLifecycleListenerSingleNodeTests.java | 4 +- ...actIndicesClusterStateServiceTestCase.java | 318 +++++++ .../indices/cluster/ClusterStateChanges.java | 234 +++++ ...ClusterStateServiceRandomUpdatesTests.java | 281 ++++++ 19 files changed, 1556 insertions(+), 567 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionUtils.java create mode 100644 core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java create mode 100644 core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java create mode 100644 core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 05ca294a9f0..d56bdeb537f 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -482,7 +482,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index d3a42a97ebb..efd525d313b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -148,18 +148,11 @@ public class ClusterChangedEvent { * has changed between the previous cluster state and the new cluster state. * Note that this is an object reference equality test, not an equals test. */ - public boolean indexMetaDataChanged(IndexMetaData current) { - MetaData previousMetaData = previousState.metaData(); - if (previousMetaData == null) { - return true; - } - IndexMetaData previousIndexMetaData = previousMetaData.index(current.getIndex()); + public static boolean indexMetaDataChanged(IndexMetaData metaData1, IndexMetaData metaData2) { + assert metaData1 != null && metaData2 != null; // no need to check on version, since disco modules will make sure to use the // same instance if its a version match - if (previousIndexMetaData == current) { - return false; - } - return true; + return metaData1 != metaData2; } /** diff --git a/core/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java b/core/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java index 0645accb42a..b1bf01018c9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataMappingService; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -58,13 +59,12 @@ public class NodeMappingRefreshAction extends AbstractComponent { transportService.registerRequestHandler(ACTION_NAME, NodeMappingRefreshRequest::new, ThreadPool.Names.SAME, new NodeMappingRefreshTransportHandler()); } - public void nodeMappingRefresh(final ClusterState state, final NodeMappingRefreshRequest request) { - final DiscoveryNodes nodes = state.nodes(); - if (nodes.getMasterNode() == null) { + public void nodeMappingRefresh(final DiscoveryNode masterNode, final NodeMappingRefreshRequest request) { + if (masterNode == null) { logger.warn("can't send mapping refresh for [{}], no master known.", request.index()); return; } - transportService.sendRequest(nodes.getMasterNode(), ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); + transportService.sendRequest(masterNode, ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); } private class NodeMappingRefreshTransportHandler implements TransportRequestHandler { diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 5780dc256a5..1902f24c4b7 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -67,6 +67,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.AliasFilterParsingException; import org.elasticsearch.indices.InvalidAliasNameException; +import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.threadpool.ThreadPool; @@ -93,7 +94,7 @@ import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; /** * */ -public final class IndexService extends AbstractIndexComponent implements IndexComponent, Iterable { +public class IndexService extends AbstractIndexComponent implements IndicesClusterStateService.AllocatedIndex { private final IndexEventListener eventListener; private final AnalysisService analysisService; @@ -184,8 +185,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC /** * Return the shard with the provided id, or null if there is no such shard. */ - @Nullable - public IndexShard getShardOrNull(int shardId) { + @Override + public @Nullable IndexShard getShardOrNull(int shardId) { return shards.get(shardId); } @@ -359,6 +360,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC return primary == false && IndexMetaData.isIndexUsingShadowReplicas(indexSettings); } + @Override public synchronized void removeShard(int shardId, String reason) { final ShardId sId = new ShardId(index(), shardId); final IndexShard indexShard; @@ -470,6 +472,11 @@ public final class IndexService extends AbstractIndexComponent implements IndexC return searchOperationListeners; } + @Override + public boolean updateMapping(IndexMetaData indexMetaData) throws IOException { + return mapperService().updateMapping(indexMetaData); + } + private class StoreCloseListener implements Store.OnClose { private final ShardId shardId; private final boolean ownsShard; @@ -617,6 +624,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC return indexSettings.getIndexMetaData(); } + @Override public synchronized void updateMetaData(final IndexMetaData metadata) { final Translog.Durability oldTranslogDurability = indexSettings.getTranslogDurability(); if (indexSettings.updateIndexMetaData(metadata)) { diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 6bd6df414af..2c20697d757 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -404,7 +404,7 @@ public final class IndexSettings { * * @return true iff any setting has been updated otherwise false. */ - synchronized boolean updateIndexMetaData(IndexMetaData indexMetaData) { + public synchronized boolean updateIndexMetaData(IndexMetaData indexMetaData) { final Settings newSettings = indexMetaData.getSettings(); if (version.equals(Version.indexCreated(newSettings)) == false) { throw new IllegalArgumentException("version mismatch on settings update expected: " + version + " but was: " + Version.indexCreated(newSettings)); diff --git a/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java index e49846ba6b2..34a0ead0596 100755 --- a/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -21,10 +21,13 @@ package org.elasticsearch.index.mapper; import com.carrotsearch.hppc.ObjectHashSet; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.DelegatingAnalyzerWrapper; import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Setting; @@ -32,6 +35,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.AbstractIndexComponent; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.mapper.Mapper.BuilderContext; @@ -183,6 +187,45 @@ public class MapperService extends AbstractIndexComponent { } } + public boolean updateMapping(IndexMetaData indexMetaData) throws IOException { + assert indexMetaData.getIndex().equals(index()) : "index mismatch: expected " + index() + " but was " + indexMetaData.getIndex(); + // go over and add the relevant mappings (or update them) + boolean requireRefresh = false; + for (ObjectCursor cursor : indexMetaData.getMappings().values()) { + MappingMetaData mappingMd = cursor.value; + String mappingType = mappingMd.type(); + CompressedXContent mappingSource = mappingMd.source(); + // refresh mapping can happen when the parsing/merging of the mapping from the metadata doesn't result in the same + // mapping, in this case, we send to the master to refresh its own version of the mappings (to conform with the + // merge version of it, which it does when refreshing the mappings), and warn log it. + try { + DocumentMapper existingMapper = documentMapper(mappingType); + + if (existingMapper == null || mappingSource.equals(existingMapper.mappingSource()) == false) { + String op = existingMapper == null ? "adding" : "updating"; + if (logger.isDebugEnabled() && mappingSource.compressed().length < 512) { + logger.debug("[{}] {} mapping [{}], source [{}]", index(), op, mappingType, mappingSource.string()); + } else if (logger.isTraceEnabled()) { + logger.trace("[{}] {} mapping [{}], source [{}]", index(), op, mappingType, mappingSource.string()); + } else { + logger.debug("[{}] {} mapping [{}] (source suppressed due to length, use TRACE level if needed)", index(), op, + mappingType); + } + merge(mappingType, mappingSource, MergeReason.MAPPING_RECOVERY, true); + if (!documentMapper(mappingType).mappingSource().equals(mappingSource)) { + logger.debug("[{}] parsed mapping [{}], and got different sources\noriginal:\n{}\nparsed:\n{}", index(), + mappingType, mappingSource, documentMapper(mappingType).mappingSource()); + requireRefresh = true; + } + } + } catch (Throwable e) { + logger.warn("[{}] failed to add mapping [{}], source [{}]", e, index(), mappingType, mappingSource); + throw e; + } + } + return requireRefresh; + } + //TODO: make this atomic public void merge(Map> mappings, boolean updateAllTypes) throws MapperParsingException { // first, add the default mapping diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 105d1e5b057..16ebd97d8c3 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -37,8 +37,6 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RestoreSource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Booleans; @@ -108,6 +106,7 @@ import org.elasticsearch.index.warmer.ShardIndexWarmerService; import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTargetService; @@ -136,7 +135,7 @@ import java.util.function.Consumer; import java.util.function.BiConsumer; import java.util.stream.Collectors; -public class IndexShard extends AbstractIndexShardComponent { +public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { private final ThreadPool threadPool; private final MapperService mapperService; @@ -338,6 +337,7 @@ public class IndexShard extends AbstractIndexShardComponent { /** * Returns the latest cluster routing entry received with this shard. */ + @Override public ShardRouting routingEntry() { return this.shardRouting; } @@ -348,13 +348,12 @@ public class IndexShard extends AbstractIndexShardComponent { /** * Updates the shards routing entry. This mutate the shards internal state depending - * on the changes that get introduced by the new routing value. This method will persist shard level metadata - * unless explicitly disabled. + * on the changes that get introduced by the new routing value. This method will persist shard level metadata. * * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted * @throws IOException if shard state could not be persisted */ - public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) throws IOException { + public void updateRoutingEntry(final ShardRouting newRouting) throws IOException { final ShardRouting currentRouting = this.shardRouting; if (!newRouting.shardId().equals(shardId())) { throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId() + ""); @@ -408,9 +407,7 @@ public class IndexShard extends AbstractIndexShardComponent { } this.shardRouting = newRouting; indexEventListener.shardRoutingChanged(this, currentRouting, newRouting); - if (persistState) { - persistMetadata(newRouting, currentRouting); - } + persistMetadata(newRouting, currentRouting); } /** @@ -589,7 +586,7 @@ public class IndexShard extends AbstractIndexShardComponent { */ public void refresh(String source) { verifyNotClosed(); - + if (canIndex()) { long bytes = getEngine().getIndexBufferRAMBytesUsed(); writingBytes.addAndGet(bytes); @@ -1370,35 +1367,36 @@ public class IndexShard extends AbstractIndexShardComponent { return this.currentEngineReference.get(); } - public void startRecovery(DiscoveryNode localNode, DiscoveryNode sourceNode, RecoveryTargetService recoveryTargetService, + public void startRecovery(RecoveryState recoveryState, RecoveryTargetService recoveryTargetService, RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, - BiConsumer mappingUpdateConsumer, IndicesService indicesService) { - final RestoreSource restoreSource = shardRouting.restoreSource(); - - if (shardRouting.isPeerRecovery()) { - assert sourceNode != null : "peer recovery started but sourceNode is null"; - // we don't mark this one as relocated at the end. - // For primaries: requests in any case are routed to both when its relocating and that way we handle - // the edge case where its mark as relocated, and we might need to roll it back... - // For replicas: we are recovering a backup from a primary - RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.PRIMARY_RELOCATION : RecoveryState.Type.REPLICA; - RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), type, sourceNode, localNode); - try { - markAsRecovering("from " + sourceNode, recoveryState); - recoveryTargetService.startRecovery(this, type, sourceNode, recoveryListener); - } catch (Throwable e) { - failShard("corrupted preexisting index", e); - recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, e), true); - } - } else if (restoreSource == null) { - // recover from filesystem store - - IndexMetaData indexMetaData = indexSettings().getIndexMetaData(); - Index mergeSourceIndex = indexMetaData.getMergeSourceIndex(); - final boolean recoverFromLocalShards = mergeSourceIndex != null && shardRouting.allocatedPostIndexCreate(indexMetaData) == false && shardRouting.primary(); - final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), - recoverFromLocalShards ? RecoveryState.Type.LOCAL_SHARDS : RecoveryState.Type.STORE, localNode, localNode); - if (recoverFromLocalShards) { + BiConsumer mappingUpdateConsumer, + IndicesService indicesService) { + switch (recoveryState.getType()) { + case PRIMARY_RELOCATION: + case REPLICA: + try { + markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); + recoveryTargetService.startRecovery(this, recoveryState.getType(), recoveryState.getSourceNode(), recoveryListener); + } catch (Throwable e) { + failShard("corrupted preexisting index", e); + recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true); + } + break; + case STORE: + markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread + threadPool.generic().execute(() -> { + try { + if (recoverFromStore()) { + recoveryListener.onRecoveryDone(recoveryState); + } + } catch (Throwable t) { + recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, t), true); + } + }); + break; + case LOCAL_SHARDS: + final IndexMetaData indexMetaData = indexSettings().getIndexMetaData(); + final Index mergeSourceIndex = indexMetaData.getMergeSourceIndex(); final List startedShards = new ArrayList<>(); final IndexService sourceIndexService = indicesService.indexService(mergeSourceIndex); final int numShards = sourceIndexService != null ? sourceIndexService.getIndexSettings().getNumberOfShards() : -1; @@ -1414,14 +1412,14 @@ public class IndexShard extends AbstractIndexShardComponent { threadPool.generic().execute(() -> { try { final Set shards = IndexMetaData.selectShrinkShards(shardId().id(), sourceIndexService.getMetaData(), - indexMetaData.getNumberOfShards()); + + indexMetaData.getNumberOfShards()); if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream() .filter((s) -> shards.contains(s.shardId())).collect(Collectors.toList()))) { recoveryListener.onRecoveryDone(recoveryState); } } catch (Throwable t) { recoveryListener.onRecoveryFailure(recoveryState, - new RecoveryFailedException(shardId, localNode, localNode, t), true); + new RecoveryFailedException(recoveryState, null, t), true); } }); } else { @@ -1433,36 +1431,25 @@ public class IndexShard extends AbstractIndexShardComponent { + " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard " + shardId()); } - recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, localNode, localNode, t), true); + recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, t), true); } - } else { - markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread + break; + case SNAPSHOT: + markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread threadPool.generic().execute(() -> { try { - if (recoverFromStore()) { + final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository( + recoveryState.getRestoreSource().snapshot().getRepository()); + if (restoreFromRepository(indexShardRepository)) { recoveryListener.onRecoveryDone(recoveryState); } - } catch (Throwable t) { - recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, t), true); + } catch (Throwable first) { + recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, first), true); } - }); - } - } else { - // recover from a restore - final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), - RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), localNode); - markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread - threadPool.generic().execute(() -> { - try { - final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshot().getRepository()); - if (restoreFromRepository(indexShardRepository)) { - recoveryListener.onRecoveryDone(recoveryState); - } - } catch (Throwable first) { - recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, first), true); - } - }); + break; + default: + throw new IllegalArgumentException("Unknown recovery type " + recoveryState.getType()); } } @@ -1472,7 +1459,7 @@ public class IndexShard extends AbstractIndexShardComponent { // called by the current engine @Override public void onFailedEngine(String reason, @Nullable Throwable failure) { - final ShardFailure shardFailure = new ShardFailure(shardRouting, reason, failure, getIndexUUID()); + final ShardFailure shardFailure = new ShardFailure(shardRouting, reason, failure); for (Callback listener : delegates) { try { listener.handle(shardFailure); @@ -1661,13 +1648,11 @@ public class IndexShard extends AbstractIndexShardComponent { public final String reason; @Nullable public final Throwable cause; - public final String indexUUID; - public ShardFailure(ShardRouting routing, String reason, @Nullable Throwable cause, String indexUUID) { + public ShardFailure(ShardRouting routing, String reason, @Nullable Throwable cause) { this.routing = routing; this.reason = reason; this.cause = cause; - this.indexUUID = indexUUID; } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index e22f684637e..e35c95ae1f0 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -59,16 +59,16 @@ public final class ShadowIndexShard extends IndexShard { /** * In addition to the regular accounting done in - * {@link IndexShard#updateRoutingEntry(ShardRouting, boolean)}, + * {@link IndexShard#updateRoutingEntry(ShardRouting)}, * if this shadow replica needs to be promoted to a primary, the shard is * failed in order to allow a new primary to be re-allocated. */ @Override - public void updateRoutingEntry(ShardRouting newRouting, boolean persistState) throws IOException { + public void updateRoutingEntry(ShardRouting newRouting) throws IOException { if (newRouting.primary() == true) {// becoming a primary throw new IllegalStateException("can't promote shard to primary"); } - super.updateRoutingEntry(newRouting, persistState); + super.updateRoutingEntry(newRouting); } @Override diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index ba512379868..3ae02c7eadd 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -55,6 +56,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.env.NodeEnvironment; @@ -86,10 +88,14 @@ import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStoreConfig; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.indices.query.IndicesQueriesRegistry; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.indices.recovery.RecoveryTargetService; import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QueryPhase; @@ -124,7 +130,8 @@ import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList; /** * */ -public class IndicesService extends AbstractLifecycleComponent implements Iterable, IndexService.ShardStoreDeleter { +public class IndicesService extends AbstractLifecycleComponent + implements IndicesClusterStateService.AllocatedIndices, IndexService.ShardStoreDeleter { public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout"; public static final Setting INDICES_CACHE_CLEAN_INTERVAL_SETTING = @@ -296,11 +303,14 @@ public class IndicesService extends AbstractLifecycleComponent i } /** - * Returns true if changes (adding / removing) indices, shards and so on are allowed. + * Checks if changes (adding / removing) indices, shards and so on are allowed. + * + * @throws IllegalStateException if no changes allowed. */ - public boolean changesAllowed() { - // we check on stop here since we defined stop when we delete the indices - return lifecycle.started(); + private void ensureChangesAllowed() { + if (lifecycle.started() == false) { + throw new IllegalStateException("Can't make changes to indices service, node is closed"); + } } @Override @@ -314,10 +324,9 @@ public class IndicesService extends AbstractLifecycleComponent i /** * Returns an IndexService for the specified index if exists otherwise returns null. - * */ - @Nullable - public IndexService indexService(Index index) { + @Override + public @Nullable IndexService indexService(Index index) { return indices.get(index.getUUID()); } @@ -339,11 +348,9 @@ public class IndicesService extends AbstractLifecycleComponent i * @param builtInListeners a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with the per-index listeners * @throws IndexAlreadyExistsException if the index already exists. */ + @Override public synchronized IndexService createIndex(final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, List builtInListeners) throws IOException { - - if (!lifecycle.started()) { - throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed"); - } + ensureChangesAllowed(); if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) { throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]"); } @@ -424,14 +431,44 @@ public class IndicesService extends AbstractLifecycleComponent i } } + @Override + public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, RecoveryTargetService recoveryTargetService, + RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, + NodeServicesProvider nodeServicesProvider, Callback onShardFailure) throws IOException { + ensureChangesAllowed(); + IndexService indexService = indexService(shardRouting.index()); + IndexShard indexShard = indexService.createShard(shardRouting); + indexShard.addShardFailureCallback(onShardFailure); + indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, + (type, mapping) -> { + assert recoveryState.getType() == RecoveryState.Type.LOCAL_SHARDS : + "mapping update consumer only required by local shards recovery"; + try { + nodeServicesProvider.getClient().admin().indices().preparePutMapping() + .setConcreteIndex(shardRouting.index()) // concrete index - no name clash, it uses uuid + .setType(type) + .setSource(mapping.source().string()) + .get(); + } catch (IOException ex) { + throw new ElasticsearchException("failed to stringify mapping source", ex); + } + }, this); + return indexShard; + } + /** * Removes the given index from this service and releases all associated resources. Persistent parts of the index * like the shards files, state and transaction logs are kept around in the case of a disaster recovery. * @param index the index to remove * @param reason the high level reason causing this removal */ + @Override public void removeIndex(Index index, String reason) { - removeIndex(index, reason, false); + try { + removeIndex(index, reason, false); + } catch (Throwable e) { + logger.warn("failed to remove index ({})", e, reason); + } } private void removeIndex(Index index, String reason, boolean delete) { @@ -516,14 +553,20 @@ public class IndicesService extends AbstractLifecycleComponent i * @param index the index to delete * @param reason the high level reason causing this delete */ - public void deleteIndex(Index index, String reason) throws IOException { - removeIndex(index, reason, true); + @Override + public void deleteIndex(Index index, String reason) { + try { + removeIndex(index, reason, true); + } catch (Throwable e) { + logger.warn("failed to delete index ({})", e, reason); + } } /** * Deletes an index that is not assigned to this node. This method cleans up all disk folders relating to the index * but does not deal with in-memory structures. For those call {@link #deleteIndex(Index, String)} */ + @Override public void deleteUnassignedIndex(String reason, IndexMetaData metaData, ClusterState clusterState) { if (nodeEnv.hasNodeFile()) { String indexName = metaData.getIndex().getName(); @@ -683,8 +726,8 @@ public class IndicesService extends AbstractLifecycleComponent i * @param clusterState {@code ClusterState} to ensure the index is not part of it * @return IndexMetaData for the index loaded from disk */ - @Nullable - public IndexMetaData verifyIndexIsDeleted(final Index index, final ClusterState clusterState) { + @Override + public @Nullable IndexMetaData verifyIndexIsDeleted(final Index index, final ClusterState clusterState) { // this method should only be called when we know the index (name + uuid) is not part of the cluster state if (clusterState.metaData().index(index) != null) { throw new IllegalStateException("Cannot delete index [" + index + "], it is still part of the cluster state."); @@ -839,6 +882,7 @@ public class IndicesService extends AbstractLifecycleComponent i * @param index the index to process the pending deletes for * @param timeout the timeout used for processing pending deletes */ + @Override public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeout) throws IOException, InterruptedException { logger.debug("{} processing pending deletes", index); final long startTimeNS = System.nanoTime(); diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 2c77f863c47..c0bfedc47ca 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -19,16 +19,13 @@ package org.elasticsearch.indices.cluster; -import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.lucene.store.LockObtainFailedException; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNode; @@ -37,7 +34,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; @@ -48,17 +44,18 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexComponent; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexShardAlreadyExistsException; import org.elasticsearch.index.NodeServicesProvider; -import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardRelocatedException; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.flush.SyncedFlushService; import org.elasticsearch.indices.recovery.RecoveryFailedException; @@ -73,7 +70,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -82,14 +78,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * */ public class IndicesClusterStateService extends AbstractLifecycleComponent implements ClusterStateListener { - private final IndicesService indicesService; + final AllocatedIndices> indicesService; private final ClusterService clusterService; private final ThreadPool threadPool; private final RecoveryTargetService recoveryTargetService; @@ -102,11 +97,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent failedShards = ConcurrentCollections.newConcurrentMap(); + final ConcurrentMap failedShardsCache = ConcurrentCollections.newConcurrentMap(); private final RestoreService restoreService; private final RepositoriesService repositoriesService; - private final Object mutex = new Object(); private final FailedShardHandler failedShardHandler = new FailedShardHandler(); private final boolean sendRefreshMapping; @@ -120,6 +114,22 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent>) indicesService, + clusterService, threadPool, recoveryTargetService, shardStateAction, + nodeMappingRefreshAction, repositoriesService, restoreService, searchService, syncedFlushService, recoverySource, + nodeServicesProvider); + } + + // for tests + IndicesClusterStateService(Settings settings, + AllocatedIndices> indicesService, + ClusterService clusterService, + ThreadPool threadPool, RecoveryTargetService recoveryTargetService, + ShardStateAction shardStateAction, + NodeMappingRefreshAction nodeMappingRefreshAction, + RepositoriesService repositoriesService, RestoreService restoreService, + SearchService searchService, SyncedFlushService syncedFlushService, + RecoverySource recoverySource, NodeServicesProvider nodeServicesProvider) { super(settings); this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTargetService, searchService, syncedFlushService); this.indicesService = indicesService; @@ -149,87 +159,97 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent indexService : indicesService) { + indicesService.removeIndex(indexService.index(), "cleaning index (disabled block persistence)"); // also cleans shards } - - cleanFailedShards(event); - - // cleaning up indices that are completely deleted so we won't need to worry about them - // when checking for shards - applyDeletedIndices(event); - applyDeletedShards(event); - // call after deleted shards so indices with no shards will be cleaned - applyCleanedIndices(event); - // make sure that newly created shards use the latest meta data - applyIndexMetaData(event); - applyNewIndices(event); - // apply mappings also updates new indices. TODO: make new indices good to begin with - applyMappings(event); - applyNewOrUpdatedShards(event); - } - } - - private void cleanFailedShards(final ClusterChangedEvent event) { - RoutingNode routingNode = event.state().getRoutingNodes().node(event.state().nodes().getLocalNodeId()); - if (routingNode == null) { - failedShards.clear(); return; } - for (Iterator> iterator = failedShards.entrySet().iterator(); iterator.hasNext(); ) { - Map.Entry entry = iterator.next(); - ShardRouting failedShardRouting = entry.getValue(); - ShardRouting matchedShardRouting = routingNode.getByShardId(failedShardRouting.shardId()); - if (matchedShardRouting == null || matchedShardRouting.isSameAllocation(failedShardRouting) == false) { + + updateFailedShardsCache(state); + + deleteIndices(event); // also deletes shards of deleted indices + + removeUnallocatedIndices(state); // also removes shards of removed indices + + failMissingShards(state); + + removeShards(state); + + updateIndices(event); // can also fail shards, but these are then guaranteed to be in failedShardsCache + + createIndices(state); + + createOrUpdateShards(state); + } + + /** + * Removes shard entries from the failed shards cache that are no longer allocated to this node by the master. + * Sends shard failures for shards that are marked as actively allocated to this node but don't actually exist on the node. + * Resends shard failures for shards that are still marked as allocated to this node but previously failed. + * + * @param state new cluster state + */ + private void updateFailedShardsCache(final ClusterState state) { + RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); + if (localRoutingNode == null) { + failedShardsCache.clear(); + return; + } + + DiscoveryNode masterNode = state.nodes().getMasterNode(); + + // remove items from cache which are not in our routing table anymore and resend failures that have not executed on master yet + for (Iterator> iterator = failedShardsCache.entrySet().iterator(); iterator.hasNext(); ) { + ShardRouting failedShardRouting = iterator.next().getValue(); + ShardRouting matchedRouting = localRoutingNode.getByShardId(failedShardRouting.shardId()); + if (matchedRouting == null || matchedRouting.isSameAllocation(failedShardRouting) == false) { iterator.remove(); + } else { + if (masterNode != null) { // TODO: can we remove this? Is resending shard failures the responsibility of shardStateAction? + String message = "master " + masterNode + " has not removed previously failed shard. resending shard failure"; + logger.trace("[{}] re-sending failed shard [{}], reason [{}]", matchedRouting.shardId(), matchedRouting, message); + shardStateAction.shardFailed(matchedRouting, matchedRouting, message, null, SHARD_STATE_ACTION_LISTENER); + } } } } - private void applyDeletedIndices(final ClusterChangedEvent event) { + /** + * Deletes indices (with shard data). + * + * @param event cluster change event + */ + private void deleteIndices(final ClusterChangedEvent event) { final ClusterState previousState = event.previousState(); - final String localNodeId = event.state().nodes().getLocalNodeId(); + final ClusterState state = event.state(); + final String localNodeId = state.nodes().getLocalNodeId(); assert localNodeId != null; for (Index index : event.indicesDeleted()) { if (logger.isDebugEnabled()) { logger.debug("[{}] cleaning index, no longer part of the metadata", index); } - final IndexService idxService = indicesService.indexService(index); + AllocatedIndex indexService = indicesService.indexService(index); final IndexSettings indexSettings; - if (idxService != null) { - indexSettings = idxService.getIndexSettings(); - deleteIndex(index, "index no longer part of the metadata"); + if (indexService != null) { + indexSettings = indexService.getIndexSettings(); + indicesService.deleteIndex(index, "index no longer part of the metadata"); } else if (previousState.metaData().hasIndex(index.getName())) { // The deleted index was part of the previous cluster state, but not loaded on the local node final IndexMetaData metaData = previousState.metaData().index(index); indexSettings = new IndexSettings(metaData, settings); - indicesService.deleteUnassignedIndex("deleted index was not assigned to local node", metaData, event.state()); + indicesService.deleteUnassignedIndex("deleted index was not assigned to local node", metaData, state); } else { // The previous cluster state's metadata also does not contain the index, // which is what happens on node startup when an index was deleted while the @@ -255,10 +275,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent indexService : indicesService) { + Index index = indexService.index(); + IndexMetaData indexMetaData = event.state().metaData().index(index); if (indexMetaData == null) { - assert false : "index" + indexService.index() + " exists locally, doesn't have a metadata but is not part " + assert false : "index" + index + " exists locally, doesn't have a metadata but is not part" + " of the delete index list. \nprevious state: " + event.previousState().prettyPrint() + "\n current state:\n" + event.state().prettyPrint(); - logger.warn("[{}] isn't part of metadata but is part of in memory structures. removing", - indexService.index()); - deleteIndex(indexService.index(), "isn't part of metadata (explicit check)"); + logger.warn("[{}] isn't part of metadata but is part of in memory structures. removing", index); + indicesService.deleteIndex(index, "isn't part of metadata (explicit check)"); } } } - private void applyDeletedShards(final ClusterChangedEvent event) { - RoutingNode routingNode = event.state().getRoutingNodes().node(event.state().nodes().getLocalNodeId()); - if (routingNode == null) { + /** + * Removes indices that have no shards allocated to this node. This does not delete the shard data as we wait for enough + * shard copies to exist in the cluster before deleting shard data (triggered by {@link org.elasticsearch.indices.store.IndicesStore}). + * + * @param state new cluster state + */ + private void removeUnallocatedIndices(final ClusterState state) { + final String localNodeId = state.nodes().getLocalNodeId(); + assert localNodeId != null; + + Set indicesWithShards = new HashSet<>(); + RoutingNode localRoutingNode = state.getRoutingNodes().node(localNodeId); + if (localRoutingNode != null) { // null e.g. if we are not a data node + for (ShardRouting shardRouting : localRoutingNode) { + indicesWithShards.add(shardRouting.index()); + } + } + + for (AllocatedIndex indexService : indicesService) { + Index index = indexService.index(); + if (indicesWithShards.contains(index) == false) { + logger.debug("{} removing index, no shards allocated", index); + indicesService.removeIndex(index, "removing index (no shards allocated)"); + } + } + } + + /** + * Notifies master about shards that don't exist but are supposed to be active on this node. + * + * @param state new cluster state + */ + private void failMissingShards(final ClusterState state) { + RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); + if (localRoutingNode == null) { return; } - - final Map> shardsByIndex = new HashMap<>(); - for (ShardRouting shard : routingNode) { - shardsByIndex.computeIfAbsent(shard.index(), k -> new HashSet<>()).add(shard.allocationId().getId()); + for (final ShardRouting shardRouting : localRoutingNode) { + ShardId shardId = shardRouting.shardId(); + if (shardRouting.initializing() == false && + failedShardsCache.containsKey(shardId) == false && + indicesService.getShardOrNull(shardId) == null) { + // the master thinks we are active, but we don't have this shard at all, mark it as failed + sendFailShard(shardRouting, "master marked shard as active, but shard has not been created, mark shard as failed", null); + } } + } - for (IndexService indexService : indicesService) { - Index index = indexService.index(); - IndexMetaData indexMetaData = event.state().metaData().index(index); - assert indexMetaData != null : "local index doesn't have metadata, should have been cleaned up by applyDeletedIndices: " + index; - // now, go over and delete shards that needs to get deleted - Set newShardAllocationIds = shardsByIndex.getOrDefault(index, Collections.emptySet()); - for (IndexShard existingShard : indexService) { - if (newShardAllocationIds.contains(existingShard.routingEntry().allocationId().getId()) == false) { - if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { - if (logger.isDebugEnabled()) { - logger.debug("{} removing shard (index is closed)", existingShard.shardId()); + /** + * Removes shards that are currently loaded by indicesService but have disappeared from the routing table of the current node. + * Also removes shards where the recovery source node has changed. + * This method does not delete the shard data. + * + * @param state new cluster state + */ + private void removeShards(final ClusterState state) { + final RoutingTable routingTable = state.routingTable(); + final DiscoveryNodes nodes = state.nodes(); + final String localNodeId = state.nodes().getLocalNodeId(); + assert localNodeId != null; + + // remove shards based on routing nodes (no deletion of data) + RoutingNode localRoutingNode = state.getRoutingNodes().node(localNodeId); + for (AllocatedIndex indexService : indicesService) { + for (Shard shard : indexService) { + ShardRouting currentRoutingEntry = shard.routingEntry(); + ShardId shardId = currentRoutingEntry.shardId(); + ShardRouting newShardRouting = localRoutingNode == null ? null : localRoutingNode.getByShardId(shardId); + if (newShardRouting == null || newShardRouting.isSameAllocation(currentRoutingEntry) == false) { + // we can just remove the shard without cleaning it locally, since we will clean it in IndicesStore + // once all shards are allocated + logger.debug("{} removing shard (not allocated)", shardId); + indexService.removeShard(shardId.id(), "removing shard (not allocated)"); + } else { + // remove shards where recovery source has changed. This re-initializes shards later in createOrUpdateShards + if (newShardRouting.isPeerRecovery()) { + RecoveryState recoveryState = shard.recoveryState(); + final DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, newShardRouting); + if (recoveryState.getSourceNode().equals(sourceNode) == false) { + if (recoveryTargetService.cancelRecoveriesForShard(shardId, "recovery source node changed")) { + // getting here means that the shard was still recovering + logger.debug("{} removing shard (recovery source changed), current [{}], global [{}])", + shardId, currentRoutingEntry, newShardRouting); + indexService.removeShard(shardId.id(), "removing shard (recovery source node changed)"); + } } - indexService.removeShard(existingShard.shardId().id(), "removing shard (index is closed)"); - } else { - // we can just remove the shard, without cleaning it locally, since we will clean it - // when all shards are allocated in the IndicesStore - if (logger.isDebugEnabled()) { - logger.debug("{} removing shard (not allocated)", existingShard.shardId()); - } - indexService.removeShard(existingShard.shardId().id(), "removing shard (not allocated)"); } } } } } - private void applyCleanedIndices(final ClusterChangedEvent event) { - // handle closed indices, since they are not allocated on a node once they are closed - // so applyDeletedIndices might not take them into account - for (IndexService indexService : indicesService) { - Index index = indexService.index(); - IndexMetaData indexMetaData = event.state().metaData().index(index); - if (indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE) { - for (Integer shardId : indexService.shardIds()) { - logger.debug("{}[{}] removing shard (index is closed)", index, shardId); - try { - indexService.removeShard(shardId, "removing shard (index is closed)"); - } catch (Throwable e) { - logger.warn("{} failed to remove shard (index is closed)", e, index); - } - } - } - } - - final Set hasAllocations = new HashSet<>(); - final RoutingNode node = event.state().getRoutingNodes().node(event.state().nodes().getLocalNodeId()); - // if no shards are allocated ie. if this node is a master-only node it can return nul - if (node != null) { - for (ShardRouting routing : node) { - hasAllocations.add(routing.index()); - } - } - for (IndexService indexService : indicesService) { - Index index = indexService.index(); - if (hasAllocations.contains(index) == false) { - assert indexService.shardIds().isEmpty() : - "no locally assigned shards, but index wasn't emptied by applyDeletedShards." - + " index " + index + ", shards: " + indexService.shardIds(); - if (logger.isDebugEnabled()) { - logger.debug("{} cleaning index (no shards allocated)", index); - } - // clean the index - removeIndex(index, "removing index (no shards allocated)"); - } - } - } - - private void applyIndexMetaData(ClusterChangedEvent event) { - if (!event.metaDataChanged()) { - return; - } - for (IndexMetaData indexMetaData : event.state().metaData()) { - if (!indicesService.hasIndex(indexMetaData.getIndex())) { - // we only create / update here - continue; - } - // if the index meta data didn't change, no need check for refreshed settings - if (!event.indexMetaDataChanged(indexMetaData)) { - continue; - } - Index index = indexMetaData.getIndex(); - IndexService indexService = indicesService.indexService(index); - if (indexService == null) { - // already deleted on us, ignore it - continue; - } - indexService.updateMetaData(indexMetaData); - } - } - - private void applyNewIndices(final ClusterChangedEvent event) { + private void createIndices(final ClusterState state) { // we only create indices for shards that are allocated - RoutingNode routingNode = event.state().getRoutingNodes().node(event.state().nodes().getLocalNodeId()); - if (routingNode == null) { + RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); + if (localRoutingNode == null) { return; } - for (ShardRouting shard : routingNode) { - if (!indicesService.hasIndex(shard.index())) { - final IndexMetaData indexMetaData = event.state().metaData().getIndexSafe(shard.index()); - if (logger.isDebugEnabled()) { - logger.debug("[{}] creating index", indexMetaData.getIndex()); - } - try { - indicesService.createIndex(nodeServicesProvider, indexMetaData, buildInIndexListener); - } catch (Throwable e) { - sendFailShard(shard, "failed to create index", e); + // create map of indices to create with shards to fail if index creation fails + final Map> indicesToCreate = new HashMap<>(); + for (ShardRouting shardRouting : localRoutingNode) { + if (failedShardsCache.containsKey(shardRouting.shardId()) == false) { + final Index index = shardRouting.index(); + if (indicesService.indexService(index) == null) { + indicesToCreate.computeIfAbsent(index, k -> new ArrayList<>()).add(shardRouting); } } } - } - private void applyMappings(ClusterChangedEvent event) { - // go over and update mappings - for (IndexMetaData indexMetaData : event.state().metaData()) { - Index index = indexMetaData.getIndex(); - if (!indicesService.hasIndex(index)) { - // we only create / update here - continue; - } - boolean requireRefresh = false; - IndexService indexService = indicesService.indexService(index); - if (indexService == null) { - // got deleted on us, ignore (closing the node) - return; - } + for (Map.Entry> entry : indicesToCreate.entrySet()) { + final Index index = entry.getKey(); + final IndexMetaData indexMetaData = state.metaData().index(index); + logger.debug("[{}] creating index", index); + + AllocatedIndex indexService = null; try { - MapperService mapperService = indexService.mapperService(); - // go over and add the relevant mappings (or update them) - for (ObjectCursor cursor : indexMetaData.getMappings().values()) { - MappingMetaData mappingMd = cursor.value; - String mappingType = mappingMd.type(); - CompressedXContent mappingSource = mappingMd.source(); - requireRefresh |= processMapping(index.getName(), mapperService, mappingType, mappingSource); - } - if (requireRefresh && sendRefreshMapping) { - nodeMappingRefreshAction.nodeMappingRefresh(event.state(), - new NodeMappingRefreshAction.NodeMappingRefreshRequest(index.getName(), indexMetaData.getIndexUUID(), - event.state().nodes().getLocalNodeId()) + indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, buildInIndexListener); + if (indexService.updateMapping(indexMetaData) && sendRefreshMapping) { + nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(), + new NodeMappingRefreshAction.NodeMappingRefreshRequest(indexMetaData.getIndex().getName(), + indexMetaData.getIndexUUID(), state.nodes().getLocalNodeId()) ); } } catch (Throwable t) { - // if we failed the mappings anywhere, we need to fail the shards for this index, note, we safeguard - // by creating the processing the mappings on the master, or on the node the mapping was introduced on, - // so this failure typically means wrong node level configuration or something similar - for (IndexShard indexShard : indexService) { - ShardRouting shardRouting = indexShard.routingEntry(); - failAndRemoveShard(shardRouting, indexService, true, "failed to update mappings", t); - } - } - } - } - - private boolean processMapping(String index, MapperService mapperService, String mappingType, CompressedXContent mappingSource) throws Throwable { - // refresh mapping can happen when the parsing/merging of the mapping from the metadata doesn't result in the same - // mapping, in this case, we send to the master to refresh its own version of the mappings (to conform with the - // merge version of it, which it does when refreshing the mappings), and warn log it. - boolean requiresRefresh = false; - try { - DocumentMapper existingMapper = mapperService.documentMapper(mappingType); - - if (existingMapper == null || mappingSource.equals(existingMapper.mappingSource()) == false) { - String op = existingMapper == null ? "adding" : "updating"; - if (logger.isDebugEnabled() && mappingSource.compressed().length < 512) { - logger.debug("[{}] {} mapping [{}], source [{}]", index, op, mappingType, mappingSource.string()); - } else if (logger.isTraceEnabled()) { - logger.trace("[{}] {} mapping [{}], source [{}]", index, op, mappingType, mappingSource.string()); + final String failShardReason; + if (indexService == null) { + failShardReason = "failed to create index"; } else { - logger.debug("[{}] {} mapping [{}] (source suppressed due to length, use TRACE level if needed)", index, op, mappingType); + failShardReason = "failed to update mapping for index"; + indicesService.removeIndex(index, "removing index (mapping update failed)"); } - mapperService.merge(mappingType, mappingSource, MapperService.MergeReason.MAPPING_RECOVERY, true); - if (!mapperService.documentMapper(mappingType).mappingSource().equals(mappingSource)) { - logger.debug("[{}] parsed mapping [{}], and got different sources\noriginal:\n{}\nparsed:\n{}", index, mappingType, mappingSource, mapperService.documentMapper(mappingType).mappingSource()); - requiresRefresh = true; + for (ShardRouting shardRouting : entry.getValue()) { + sendFailShard(shardRouting, failShardReason, t); } } - } catch (Throwable e) { - logger.warn("[{}] failed to add mapping [{}], source [{}]", e, index, mappingType, mappingSource); - throw e; } - return requiresRefresh; } - - private void applyNewOrUpdatedShards(final ClusterChangedEvent event) { - if (!indicesService.changesAllowed()) { + private void updateIndices(ClusterChangedEvent event) { + if (!event.metaDataChanged()) { return; } - - RoutingTable routingTable = event.state().routingTable(); - RoutingNode routingNode = event.state().getRoutingNodes().node(event.state().nodes().getLocalNodeId()); - - if (routingNode == null) { - failedShards.clear(); - return; - } - - DiscoveryNodes nodes = event.state().nodes(); - for (final ShardRouting shardRouting : routingNode) { - final IndexService indexService = indicesService.indexService(shardRouting.index()); - if (indexService == null) { - // creation failed for some reasons - assert failedShards.containsKey(shardRouting.shardId()) : - "index has local allocation but is not created by applyNewIndices and is not failed " + shardRouting; - continue; - } - final IndexMetaData indexMetaData = event.state().metaData().index(shardRouting.index()); - assert indexMetaData != null : "index has local allocation but no meta data. " + shardRouting.index(); - - final int shardId = shardRouting.id(); - - if (!indexService.hasShard(shardId) && shardRouting.started()) { - if (failedShards.containsKey(shardRouting.shardId())) { - if (nodes.getMasterNode() != null) { - String message = "master " + nodes.getMasterNode() + " marked shard as started, but shard has previous failed. resending shard failure"; - logger.trace("[{}] re-sending failed shard [{}], reason [{}]", shardRouting.shardId(), shardRouting, message); - shardStateAction.shardFailed(shardRouting, shardRouting, message, null, SHARD_STATE_ACTION_LISTENER); + final ClusterState state = event.state(); + for (AllocatedIndex indexService : indicesService) { + final Index index = indexService.index(); + final IndexMetaData currentIndexMetaData = indexService.getIndexSettings().getIndexMetaData(); + final IndexMetaData newIndexMetaData = state.metaData().index(index); + assert newIndexMetaData != null : "index " + index + " should have been removed by deleteIndices"; + if (ClusterChangedEvent.indexMetaDataChanged(currentIndexMetaData, newIndexMetaData)) { + indexService.updateMetaData(newIndexMetaData); + try { + if (indexService.updateMapping(newIndexMetaData) && sendRefreshMapping) { + nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(), + new NodeMappingRefreshAction.NodeMappingRefreshRequest(newIndexMetaData.getIndex().getName(), + newIndexMetaData.getIndexUUID(), state.nodes().getLocalNodeId()) + ); } - } else { - // the master thinks we are started, but we don't have this shard at all, mark it as failed - sendFailShard(shardRouting, "master [" + nodes.getMasterNode() + "] marked shard as started, but shard has not been created, mark shard as failed", null); - } - continue; - } + } catch (Throwable t) { + indicesService.removeIndex(indexService.index(), "removing index (mapping update failed)"); - IndexShard indexShard = indexService.getShardOrNull(shardId); - if (indexShard != null) { - ShardRouting currentRoutingEntry = indexShard.routingEntry(); - // if the current and global routing are initializing, but are still not the same, its a different "shard" being allocated - // for example: a shard that recovers from one node and now needs to recover to another node, - // or a replica allocated and then allocating a primary because the primary failed on another node - boolean shardHasBeenRemoved = false; - assert currentRoutingEntry.isSameAllocation(shardRouting) : - "local shard has a different allocation id but wasn't cleaning by applyDeletedShards. " - + "cluster state: " + shardRouting + " local: " + currentRoutingEntry; - if (shardRouting.isPeerRecovery()) { - RecoveryState recoveryState = indexShard.recoveryState(); - final DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting); - if (recoveryState.getSourceNode().equals(sourceNode) == false) { - if (recoveryTargetService.cancelRecoveriesForShard(currentRoutingEntry.shardId(), "recovery source node changed")) { - // getting here means that the shard was still recovering - logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting); - indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)"); - shardHasBeenRemoved = true; + // fail shards that would be created or updated by createOrUpdateShards + RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); + if (localRoutingNode != null) { + for (final ShardRouting shardRouting : localRoutingNode) { + if (shardRouting.index().equals(index) && failedShardsCache.containsKey(shardRouting.shardId()) == false) { + sendFailShard(shardRouting, "failed to update mapping for index", t); + } } } } - - if (shardHasBeenRemoved == false) { - try { - indexShard.updateRoutingEntry(shardRouting, event.state().blocks().disableStatePersistence() == false); - } catch (Throwable e) { - failAndRemoveShard(shardRouting, indexService, true, "failed updating shard routing entry", e); - } - } - } - - if (shardRouting.initializing()) { - applyInitializingShard(event.state(), indexService, shardRouting); } } } - private void applyInitializingShard(final ClusterState state, IndexService indexService, final ShardRouting shardRouting) { - final RoutingTable routingTable = state.routingTable(); - final DiscoveryNodes nodes = state.getNodes(); - final int shardId = shardRouting.id(); + private void createOrUpdateShards(final ClusterState state) { + RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); + if (localRoutingNode == null) { + return; + } - if (indexService.hasShard(shardId)) { - IndexShard indexShard = indexService.getShard(shardId); - if (indexShard.state() == IndexShardState.STARTED || indexShard.state() == IndexShardState.POST_RECOVERY) { - // the master thinks we are initializing, but we are already started or on POST_RECOVERY and waiting - // for master to confirm a shard started message (either master failover, or a cluster event before - // we managed to tell the master we started), mark us as started - if (logger.isTraceEnabled()) { - logger.trace("{} master marked shard as initializing, but shard has state [{}], resending shard started to {}", - indexShard.shardId(), indexShard.state(), nodes.getMasterNode()); - } - if (nodes.getMasterNode() != null) { - shardStateAction.shardStarted(shardRouting, - "master " + nodes.getMasterNode() + " marked shard as initializing, but shard state is [" + indexShard.state() + "], mark shard as started", - SHARD_STATE_ACTION_LISTENER); - } - return; - } else { - if (indexShard.ignoreRecoveryAttempt()) { - logger.trace("ignoring recovery instruction for an existing shard {} (shard state: [{}])", indexShard.shardId(), indexShard.state()); - return; + DiscoveryNodes nodes = state.nodes(); + RoutingTable routingTable = state.routingTable(); + + for (final ShardRouting shardRouting : localRoutingNode) { + ShardId shardId = shardRouting.shardId(); + if (failedShardsCache.containsKey(shardId) == false) { + AllocatedIndex indexService = indicesService.indexService(shardId.getIndex()); + assert indexService != null : "index " + shardId.getIndex() + " should have been created by createIndices"; + Shard shard = indexService.getShardOrNull(shardId.id()); + if (shard == null) { + assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards"; + createShard(nodes, routingTable, shardRouting, indexService); + } else { + updateShard(nodes, shardRouting, shard); } } } + } + + private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, + AllocatedIndex indexService) { + assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting; - // if we're in peer recovery, try to find out the source node now so in case it fails, we will not create the index shard DiscoveryNode sourceNode = null; if (shardRouting.isPeerRecovery()) { sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting); @@ -595,50 +516,49 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { - try { - nodeServicesProvider.getClient().admin().indices().preparePutMapping() - .setConcreteIndex(indexService.index()) // concrete index - no name clash, it uses uuid - .setType(type) - .setSource(mapping.source().string()) - .get(); - } catch (IOException ex) { - throw new ElasticsearchException("failed to stringify mapping source", ex); - } - }, indicesService); + final IndexShardState state = shard.state(); + if (shardRouting.initializing() && (state == IndexShardState.STARTED || state == IndexShardState.POST_RECOVERY)) { + // the master thinks we are initializing, but we are already started or on POST_RECOVERY and waiting + // for master to confirm a shard started message (either master failover, or a cluster event before + // we managed to tell the master we started), mark us as started + if (logger.isTraceEnabled()) { + logger.trace("{} master marked shard as initializing, but shard has state [{}], resending shard started to {}", + shardRouting.shardId(), state, nodes.getMasterNode()); + } + if (nodes.getMasterNode() != null) { + shardStateAction.shardStarted(shardRouting, "master " + nodes.getMasterNode() + + " marked shard as initializing, but shard state is [" + state + "], mark shard as started", + SHARD_STATE_ACTION_LISTENER); + } + } } /** @@ -646,7 +566,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent indexService = indicesService.indexService(shardRouting.shardId().getIndex()); + if (indexService != null) { + indexService.removeShard(shardRouting.shardId().id(), message); } + } catch (ShardNotFoundException e) { + // the node got closed on us, ignore it + } catch (Throwable e1) { + logger.warn("[{}][{}] failed to remove shard after failure ([{}])", e1, shardRouting.getIndexName(), shardRouting.getId(), + message); } if (sendShardFailure) { sendFailShard(shardRouting, message, failure); @@ -760,23 +683,156 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { @Override public void handle(final IndexShard.ShardFailure shardFailure) { - final IndexService indexService = indicesService.indexService(shardFailure.routing.shardId().getIndex()); final ShardRouting shardRouting = shardFailure.routing; threadPool.generic().execute(() -> { - synchronized (mutex) { - failAndRemoveShard(shardRouting, indexService, true, "shard failure, reason [" + shardFailure.reason + "]", shardFailure.cause); + synchronized (IndicesClusterStateService.this) { + failAndRemoveShard(shardRouting, true, "shard failure, reason [" + shardFailure.reason + "]", shardFailure.cause); } }); } } + + public interface Shard { + + /** + * Returns the shard id of this shard. + */ + ShardId shardId(); + + /** + * Returns the latest cluster routing entry received with this shard. + */ + ShardRouting routingEntry(); + + /** + * Returns the latest internal shard state. + */ + IndexShardState state(); + + /** + * Returns the recovery state associated with this shard. + */ + RecoveryState recoveryState(); + + /** + * Updates the shards routing entry. This mutate the shards internal state depending + * on the changes that get introduced by the new routing value. This method will persist shard level metadata. + * + * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted + * @throws IOException if shard state could not be persisted + */ + void updateRoutingEntry(ShardRouting shardRouting) throws IOException; + } + + public interface AllocatedIndex extends Iterable, IndexComponent { + + /** + * Returns the index settings of this index. + */ + IndexSettings getIndexSettings(); + + /** + * Updates the meta data of this index. Changes become visible through {@link #getIndexSettings()} + */ + void updateMetaData(IndexMetaData indexMetaData); + + /** + * Checks if index requires refresh from master. + */ + boolean updateMapping(IndexMetaData indexMetaData) throws IOException; + + /** + * Returns shard with given id. + */ + @Nullable T getShardOrNull(int shardId); + + /** + * Removes shard with given id. + */ + void removeShard(int shardId, String message); + } + + public interface AllocatedIndices> extends Iterable { + + /** + * Creates a new {@link IndexService} for the given metadata. + * @param indexMetaData the index metadata to create the index for + * @param builtInIndexListener a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with + * the per-index listeners + * @throws IndexAlreadyExistsException if the index already exists. + */ + U createIndex(NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, + List builtInIndexListener) throws IOException; + + /** + * Verify that the contents on disk for the given index is deleted; if not, delete the contents. + * This method assumes that an index is already deleted in the cluster state and/or explicitly + * through index tombstones. + * @param index {@code Index} to make sure its deleted from disk + * @param clusterState {@code ClusterState} to ensure the index is not part of it + * @return IndexMetaData for the index loaded from disk + */ + IndexMetaData verifyIndexIsDeleted(Index index, ClusterState clusterState); + + /** + * Deletes the given index. Persistent parts of the index + * like the shards files, state and transaction logs are removed once all resources are released. + * + * Equivalent to {@link #removeIndex(Index, String)} but fires + * different lifecycle events to ensure pending resources of this index are immediately removed. + * @param index the index to delete + * @param reason the high level reason causing this delete + */ + void deleteIndex(Index index, String reason); + + /** + * Deletes an index that is not assigned to this node. This method cleans up all disk folders relating to the index + * but does not deal with in-memory structures. For those call {@link #deleteIndex(Index, String)} + */ + void deleteUnassignedIndex(String reason, IndexMetaData metaData, ClusterState clusterState); + + /** + * Removes the given index from this service and releases all associated resources. Persistent parts of the index + * like the shards files, state and transaction logs are kept around in the case of a disaster recovery. + * @param index the index to remove + * @param reason the high level reason causing this removal + */ + void removeIndex(Index index, String reason); + + /** + * Returns an IndexService for the specified index if exists otherwise returns null. + */ + @Nullable U indexService(Index index); + + /** + * Creates shard for the specified shard routing and starts recovery, + */ + T createShard(ShardRouting shardRouting, RecoveryState recoveryState, RecoveryTargetService recoveryTargetService, + RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, + NodeServicesProvider nodeServicesProvider, Callback onShardFailure) throws IOException; + + /** + * Returns shard for the specified id if it exists otherwise returns null. + */ + default T getShardOrNull(ShardId shardId) { + U indexRef = indexService(shardId.getIndex()); + if (indexRef != null) { + return indexRef.getShardOrNull(shardId.id()); + } + return null; + } + + void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeValue) throws IOException, InterruptedException; + } } diff --git a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionUtils.java b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionUtils.java new file mode 100644 index 00000000000..be4a7b29703 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionUtils.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.master; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; + +public class TransportMasterNodeActionUtils { + + /** + * Allows to directly call {@link TransportMasterNodeAction#masterOperation(MasterNodeRequest, ClusterState, ActionListener)} which is + * a protected method. + */ + public static , Response extends ActionResponse> void runMasterOperation( + TransportMasterNodeAction masterNodeAction, Request request, ClusterState clusterState, + ActionListener actionListener) throws Exception { + assert masterNodeAction.checkBlock(request, clusterState) == null; + masterNodeAction.masterOperation(request, clusterState, actionListener); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java index 731ecb859ee..72748a59986 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterChangedEventTests.java @@ -140,24 +140,24 @@ public class ClusterChangedEventTests extends ESTestCase { */ public void testIndexMetaDataChange() { final int numNodesInCluster = 3; - final ClusterState originalState = createState(numNodesInCluster, randomBoolean(), initialIndices); - final ClusterState newState = originalState; // doesn't matter for this test, just need a non-null value - final ClusterChangedEvent event = new ClusterChangedEvent("_na_", originalState, newState); + final ClusterState state = createState(numNodesInCluster, randomBoolean(), initialIndices); // test when its not the same IndexMetaData final Index index = initialIndices.get(0); - final IndexMetaData originalIndexMeta = originalState.metaData().index(index); + final IndexMetaData originalIndexMeta = state.metaData().index(index); // make sure the metadata is actually on the cluster state assertNotNull("IndexMetaData for " + index + " should exist on the cluster state", originalIndexMeta); IndexMetaData newIndexMeta = createIndexMetadata(index, originalIndexMeta.getVersion() + 1); - assertTrue("IndexMetaData with different version numbers must be considered changed", event.indexMetaDataChanged(newIndexMeta)); + assertTrue("IndexMetaData with different version numbers must be considered changed", + ClusterChangedEvent.indexMetaDataChanged(originalIndexMeta, newIndexMeta)); // test when it doesn't exist newIndexMeta = createIndexMetadata(new Index("doesntexist", UUIDs.randomBase64UUID())); - assertTrue("IndexMetaData that didn't previously exist should be considered changed", event.indexMetaDataChanged(newIndexMeta)); + assertTrue("IndexMetaData that didn't previously exist should be considered changed", + ClusterChangedEvent.indexMetaDataChanged(originalIndexMeta, newIndexMeta)); // test when its the same IndexMetaData - assertFalse("IndexMetaData should be the same", event.indexMetaDataChanged(originalIndexMeta)); + assertFalse("IndexMetaData should be the same", ClusterChangedEvent.indexMetaDataChanged(originalIndexMeta, originalIndexMeta)); } /** diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java index 307df91c302..1f39706e4f4 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RandomAllocationDeciderTests.java @@ -161,7 +161,7 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase { } } - private static final class RandomAllocationDecider extends AllocationDecider { + public static final class RandomAllocationDecider extends AllocationDecider { private final Random random; diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 8889ebfb910..40a23ee66cf 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -217,24 +217,14 @@ public class IndexShardTests extends ESSingleNodeTestCase { ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); assertEquals(getShardStateMetadata(shard), shardStateMetaData); ShardRouting routing = shard.shardRouting; - shard.updateRoutingEntry(routing, true); + shard.updateRoutingEntry(routing); shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); assertEquals(shardStateMetaData, getShardStateMetadata(shard)); assertEquals(shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId())); - // check that we don't write shard state metadata if persist == false - ShardRouting updatedRouting = shard.shardRouting; - updatedRouting = TestShardRouting.relocate(shard.shardRouting, "some node", 42L); - shard.updateRoutingEntry(updatedRouting, false); - shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); - assertFalse("shard state persisted despite of persist=false", shardStateMetaData.equals(getShardStateMetadata(shard))); - assertEquals("shard state persisted despite of persist=false", shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId())); - - // check that we write shard state metadata if persist == true - shard.updateRoutingEntry(routing, false); // move back state in IndexShard - routing = updatedRouting; - shard.updateRoutingEntry(routing, true); + routing = TestShardRouting.relocate(shard.shardRouting, "some node", 42L); + shard.updateRoutingEntry(routing); shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); assertEquals(shardStateMetaData, getShardStateMetadata(shard)); assertEquals(shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId())); @@ -336,7 +326,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { // simulate promotion ShardRouting newReplicaShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), null, false, ShardRoutingState.STARTED, temp.allocationId()); - indexShard.updateRoutingEntry(newReplicaShardRouting, false); + indexShard.updateRoutingEntry(newReplicaShardRouting); primaryTerm = primaryTerm + 1; indexShard.updatePrimaryTerm(primaryTerm); newPrimaryShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), null, @@ -344,7 +334,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { } else { newPrimaryShardRouting = temp; } - indexShard.updateRoutingEntry(newPrimaryShardRouting, false); + indexShard.updateRoutingEntry(newPrimaryShardRouting); assertEquals(0, indexShard.getActiveOperationsCount()); if (newPrimaryShardRouting.isRelocationTarget() == false) { @@ -381,7 +371,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), null, false, ShardRoutingState.STARTED, AllocationId.newRelocation(temp.allocationId())); - indexShard.updateRoutingEntry(newShardRouting, false); + indexShard.updateRoutingEntry(newShardRouting); break; case 1: // initializing replica / primary @@ -391,13 +381,13 @@ public class IndexShardTests extends ESSingleNodeTestCase { relocating ? randomBoolean() : false, ShardRoutingState.INITIALIZING, relocating ? AllocationId.newRelocation(temp.allocationId()) : temp.allocationId()); - indexShard.updateRoutingEntry(newShardRouting, false); + indexShard.updateRoutingEntry(newShardRouting); break; case 2: // relocation source newShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), "otherNode", false, ShardRoutingState.RELOCATING, AllocationId.newRelocation(temp.allocationId())); - indexShard.updateRoutingEntry(newShardRouting, false); + indexShard.updateRoutingEntry(newShardRouting); indexShard.relocated("test"); break; default: @@ -983,7 +973,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { test.removeShard(0, "b/c simon says so"); routing = ShardRoutingHelper.reinit(routing); IndexShard newShard = test.createShard(routing); - newShard.updateRoutingEntry(routing, false); + newShard.updateRoutingEntry(routing); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); assertTrue(newShard.recoverFromStore()); @@ -991,7 +981,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); - newShard.updateRoutingEntry(routing.moveToStarted(), true); + newShard.updateRoutingEntry(routing.moveToStarted()); SearchResponse response = client().prepareSearch().get(); assertHitCount(response, 1); } @@ -1010,7 +1000,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { test.removeShard(0, "b/c simon says so"); routing = ShardRoutingHelper.reinit(routing, UnassignedInfo.Reason.INDEX_CREATED); IndexShard newShard = test.createShard(routing); - newShard.updateRoutingEntry(routing, false); + newShard.updateRoutingEntry(routing); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); @@ -1019,7 +1009,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertEquals(0, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); - newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted(), true); + newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted()); SearchResponse response = client().prepareSearch().get(); assertHitCount(response, 0); } @@ -1045,7 +1035,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { store.decRef(); routing = ShardRoutingHelper.reinit(routing); IndexShard newShard = test.createShard(routing); - newShard.updateRoutingEntry(routing, false); + newShard.updateRoutingEntry(routing); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); try { newShard.recoverFromStore(); @@ -1065,11 +1055,11 @@ public class IndexShardTests extends ESSingleNodeTestCase { } test.removeShard(0, "I broken it"); newShard = test.createShard(routing); - newShard.updateRoutingEntry(routing, false); + newShard.updateRoutingEntry(routing); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode)); assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore()); - newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted(), true); + newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted()); SearchResponse response = client().prepareSearch().get(); assertHitCount(response, 0); // we can't issue this request through a client because of the inconsistencies we created with the cluster state @@ -1090,11 +1080,11 @@ public class IndexShardTests extends ESSingleNodeTestCase { ShardRouting origRouting = shard.routingEntry(); assertThat(shard.state(), equalTo(IndexShardState.STARTED)); ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node"); - shard.updateRoutingEntry(inRecoveryRouting, true); + shard.updateRoutingEntry(inRecoveryRouting); shard.relocated("simulate mark as relocated"); assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); try { - shard.updateRoutingEntry(origRouting, true); + shard.updateRoutingEntry(origRouting); fail("Expected IndexShardRelocatedException"); } catch (IndexShardRelocatedException expected) { } @@ -1123,7 +1113,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { Store sourceStore = test_shard.store(); Store targetStore = test_target_shard.store(); - test_target_shard.updateRoutingEntry(routing, false); + test_target_shard.updateRoutingEntry(routing); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); test_target_shard.markAsRecovering("store", new RecoveryState(routing.shardId(), routing.primary(), RecoveryState.Type.SNAPSHOT, routing.restoreSource(), localNode)); assertTrue(test_target_shard.restoreFromRepository(new IndexShardRepository() { @@ -1156,7 +1146,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { } })); - test_target_shard.updateRoutingEntry(routing.moveToStarted(), true); + test_target_shard.updateRoutingEntry(routing.moveToStarted()); assertHitCount(client().prepareSearch("test_target").get(), 1); assertSearchHits(client().prepareSearch("test_target").get(), "0"); } @@ -1411,7 +1401,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), newShard.routingEntry().primary(), RecoveryState.Type.STORE, localNode, localNode)); assertTrue(newShard.recoverFromStore()); - newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted(), true); + newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); return newShard; } @@ -1543,7 +1533,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); { final IndexShard newShard = test.createShard(routing); - newShard.updateRoutingEntry(routing, false); + newShard.updateRoutingEntry(routing); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.LOCAL_SHARDS, localNode, localNode)); BiConsumer mappingConsumer = (type, mapping) -> { @@ -1575,7 +1565,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { } } routing = ShardRoutingHelper.moveToStarted(routing); - newShard.updateRoutingEntry(routing, true); + newShard.updateRoutingEntry(routing); assertHitCount(client().prepareSearch("index_1").get(), 2); } // now check that it's persistent ie. that the added shards are committed @@ -1587,7 +1577,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.LOCAL_SHARDS, localNode, localNode)); assertTrue(newShard.recoverFromStore()); routing = ShardRoutingHelper.moveToStarted(routing); - newShard.updateRoutingEntry(routing, true); + newShard.updateRoutingEntry(routing); assertHitCount(client().prepareSearch("index_1").get(), 2); } diff --git a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index 2722fc9d9d3..1f1b758f349 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -455,7 +455,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { assertEquals(1, imc.availableShards().size()); assertTrue(newShard.recoverFromStore()); assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1); - newShard.updateRoutingEntry(routing.moveToStarted(), true); + newShard.updateRoutingEntry(routing.moveToStarted()); } finally { newShard.close("simon says", false); } diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 8d59da7da01..92a411a95de 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -102,13 +102,13 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas newRouting = ShardRoutingHelper.moveToUnassigned(newRouting, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "boom")); newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); IndexShard shard = index.createShard(newRouting); - shard.updateRoutingEntry(newRouting, true); + shard.updateRoutingEntry(newRouting); final DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT); shard.markAsRecovering("store", new RecoveryState(shard.shardId(), newRouting.primary(), RecoveryState.Type.SNAPSHOT, newRouting.restoreSource(), localNode)); shard.recoverFromStore(); newRouting = ShardRoutingHelper.moveToStarted(newRouting); - shard.updateRoutingEntry(newRouting, true); + shard.updateRoutingEntry(newRouting); } finally { indicesService.deleteIndex(idx, "simon says"); } diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java new file mode 100644 index 00000000000..69bee510710 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -0,0 +1,318 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.cluster; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.Callback; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.NodeServicesProvider; +import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndex; +import org.elasticsearch.indices.cluster.IndicesClusterStateService.Shard; +import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.indices.recovery.RecoveryTargetService; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; +import static org.hamcrest.Matchers.equalTo; + +/** + * Abstract base class for tests against {@link IndicesClusterStateService} + */ +public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestCase { + + + protected void failRandomly() { + if (rarely()) { + throw new RuntimeException("dummy test failure"); + } + } + + /** + * Checks if cluster state matches internal state of IndicesClusterStateService instance + * + * @param state cluster state used for matching + */ + public static void assertClusterStateMatchesNodeState(ClusterState state, IndicesClusterStateService indicesClusterStateService) { + AllocatedIndices> indicesService = + indicesClusterStateService.indicesService; + ConcurrentMap failedShardsCache = indicesClusterStateService.failedShardsCache; + RoutingNode localRoutingNode = state.getRoutingNodes().node(state.getNodes().getLocalNodeId()); + if (localRoutingNode != null) { + // check that all shards in local routing nodes have been allocated + for (ShardRouting shardRouting : localRoutingNode) { + Index index = shardRouting.index(); + IndexMetaData indexMetaData = state.metaData().getIndexSafe(index); + + Shard shard = indicesService.getShardOrNull(shardRouting.shardId()); + ShardRouting failedShard = failedShardsCache.get(shardRouting.shardId()); + if (shard == null && failedShard == null) { + fail("Shard with id " + shardRouting + " expected but missing in indicesService and failedShardsCache"); + } + if (failedShard != null && failedShard.isSameAllocation(shardRouting) == false) { + fail("Shard cache has not been properly cleaned for " + failedShard); + } + + if (shard != null) { + AllocatedIndex indexService = indicesService.indexService(index); + assertTrue("Index " + index + " expected but missing in indicesService", indexService != null); + + // index metadata has been updated + assertThat(indexService.getIndexSettings().getIndexMetaData(), equalTo(indexMetaData)); + // shard has been created + if (failedShard == null) { + assertTrue("Shard with id " + shardRouting + " expected but missing in indexService", + shard != null); + // shard has latest shard routing + assertThat(shard.routingEntry(), equalTo(shardRouting)); + } + } + } + } + + // all other shards / indices have been cleaned up + for (AllocatedIndex indexService : indicesService) { + assertTrue(state.metaData().getIndexSafe(indexService.index()) != null); + + boolean shardsFound = false; + for (Shard shard : indexService) { + shardsFound = true; + ShardRouting persistedShardRouting = shard.routingEntry(); + boolean found = false; + for (ShardRouting shardRouting : localRoutingNode) { + if (persistedShardRouting.equals(shardRouting)) { + found = true; + } + } + assertTrue(found); + } + + if (shardsFound == false) { + // check if we have shards of that index in failedShardsCache + // if yes, we might not have cleaned the index as failedShardsCache can be populated by another thread + assertFalse(failedShardsCache.keySet().stream().noneMatch(shardId -> shardId.getIndex().equals(indexService.index()))); + } + + } + } + + /** + * Mock for {@link IndicesService} + */ + protected class MockIndicesService implements AllocatedIndices { + private volatile Map indices = emptyMap(); + + @Override + public synchronized MockIndexService createIndex(NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, + List buildInIndexListener) throws IOException { + MockIndexService indexService = new MockIndexService(new IndexSettings(indexMetaData, Settings.EMPTY)); + indices = newMapBuilder(indices).put(indexMetaData.getIndexUUID(), indexService).immutableMap(); + return indexService; + } + + @Override + public IndexMetaData verifyIndexIsDeleted(Index index, ClusterState state) { + return null; + } + + @Override + public void deleteUnassignedIndex(String reason, IndexMetaData metaData, ClusterState clusterState) { + + } + + @Override + public synchronized void deleteIndex(Index index, String reason) { + if (hasIndex(index) == false) { + return; + } + Map newIndices = new HashMap<>(indices); + newIndices.remove(index.getUUID()); + indices = unmodifiableMap(newIndices); + } + + @Override + public synchronized void removeIndex(Index index, String reason) { + if (hasIndex(index) == false) { + return; + } + Map newIndices = new HashMap<>(indices); + newIndices.remove(index.getUUID()); + indices = unmodifiableMap(newIndices); + } + + @Override + public @Nullable MockIndexService indexService(Index index) { + return indices.get(index.getUUID()); + } + + @Override + public MockIndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, + RecoveryTargetService recoveryTargetService, + RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, + NodeServicesProvider nodeServicesProvider, Callback onShardFailure) + throws IOException { + failRandomly(); + MockIndexService indexService = indexService(recoveryState.getShardId().getIndex()); + MockIndexShard indexShard = indexService.createShard(shardRouting); + indexShard.recoveryState = recoveryState; + return indexShard; + } + + @Override + public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeValue) throws IOException, + InterruptedException { + + } + + private boolean hasIndex(Index index) { + return indices.containsKey(index.getUUID()); + } + + @Override + public Iterator iterator() { + return indices.values().iterator(); + } + } + + /** + * Mock for {@link IndexService} + */ + protected class MockIndexService implements AllocatedIndex { + private volatile Map shards = emptyMap(); + + private final IndexSettings indexSettings; + + public MockIndexService(IndexSettings indexSettings) { + this.indexSettings = indexSettings; + } + + @Override + public IndexSettings getIndexSettings() { + return indexSettings; + } + + @Override + public boolean updateMapping(IndexMetaData indexMetaData) throws IOException { + failRandomly(); + return false; + } + + @Override + public void updateMetaData(IndexMetaData indexMetaData) { + indexSettings.updateIndexMetaData(indexMetaData); + } + + @Override + public MockIndexShard getShardOrNull(int shardId) { + return shards.get(shardId); + } + + public synchronized MockIndexShard createShard(ShardRouting routing) throws IOException { + failRandomly(); + MockIndexShard shard = new MockIndexShard(routing); + shards = newMapBuilder(shards).put(routing.id(), shard).immutableMap(); + return shard; + } + + @Override + public synchronized void removeShard(int shardId, String reason) { + if (shards.containsKey(shardId) == false) { + return; + } + HashMap newShards = new HashMap<>(shards); + MockIndexShard indexShard = newShards.remove(shardId); + assert indexShard != null; + shards = unmodifiableMap(newShards); + } + + @Override + public Iterator iterator() { + return shards.values().iterator(); + } + + @Override + public Index index() { + return indexSettings.getIndex(); + } + } + + /** + * Mock for {@link IndexShard} + */ + protected class MockIndexShard implements IndicesClusterStateService.Shard { + private volatile ShardRouting shardRouting; + private volatile RecoveryState recoveryState; + + public MockIndexShard(ShardRouting shardRouting) { + this.shardRouting = shardRouting; + } + + @Override + public ShardId shardId() { + return shardRouting.shardId(); + } + + @Override + public RecoveryState recoveryState() { + return recoveryState; + } + + @Override + public ShardRouting routingEntry() { + return shardRouting; + } + + @Override + public IndexShardState state() { + return null; + } + + @Override + public void updateRoutingEntry(ShardRouting shardRouting) throws IOException { + failRandomly(); + assert this.shardId().equals(shardRouting.shardId()); + assert this.shardRouting.isSameAllocation(shardRouting); + this.shardRouting = shardRouting; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java new file mode 100644 index 00000000000..84e83db6d1d --- /dev/null +++ b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -0,0 +1,234 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.cluster; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; +import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.open.TransportOpenIndexAction; +import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.action.support.master.TransportMasterNodeActionUtils; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.metadata.AliasValidator; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService; +import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; +import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; +import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.RandomAllocationDeciderTests; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.NodeServicesProvider; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom; +import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ClusterStateChanges { + + private final ClusterService clusterService; + private final AllocationService allocationService; + + // transport actions + private final TransportCloseIndexAction transportCloseIndexAction; + private final TransportOpenIndexAction transportOpenIndexAction; + private final TransportDeleteIndexAction transportDeleteIndexAction; + private final TransportUpdateSettingsAction transportUpdateSettingsAction; + private final TransportClusterRerouteAction transportClusterRerouteAction; + private final TransportCreateIndexAction transportCreateIndexAction; + + public ClusterStateChanges() { + Settings settings = Settings.builder().put(PATH_HOME_SETTING.getKey(), "dummy").build(); + + allocationService = new AllocationService(settings, new AllocationDeciders(settings, + new HashSet<>(Arrays.asList(new SameShardAllocationDecider(settings), + new ReplicaAfterPrimaryActiveAllocationDecider(settings), + new RandomAllocationDeciderTests.RandomAllocationDecider(getRandom())))), + NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), + EmptyClusterInfoService.INSTANCE); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ActionFilters actionFilters = new ActionFilters(Collections.emptySet()); + IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(settings); + DestructiveOperations destructiveOperations = new DestructiveOperations(settings, clusterSettings); + Environment environment = new Environment(settings); + ThreadPool threadPool = null; // it's not used + Transport transport = null; // it's not used + + // mocks + clusterService = mock(ClusterService.class); + IndicesService indicesService = mock(IndicesService.class); + // MetaDataCreateIndexService creates indices using its IndicesService instance to check mappings -> fake it here + try { + when(indicesService.createIndex(any(NodeServicesProvider.class), any(IndexMetaData.class), anyList())) + .then(invocationOnMock -> { + IndexService indexService = mock(IndexService.class); + IndexMetaData indexMetaData = (IndexMetaData)invocationOnMock.getArguments()[1]; + when(indexService.index()).thenReturn(indexMetaData.getIndex()); + MapperService mapperService = mock(MapperService.class); + when(indexService.mapperService()).thenReturn(mapperService); + when(mapperService.docMappers(anyBoolean())).thenReturn(Collections.emptyList()); + when(indexService.getIndexEventListener()).thenReturn(new IndexEventListener() {}); + return indexService; + }); + } catch (IOException e) { + throw new IllegalStateException(e); + } + + // services + TransportService transportService = new TransportService(settings, transport, threadPool, null); + MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, null, null) { + // metaData upgrader should do nothing + @Override + public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) { + return indexMetaData; + } + }; + NodeServicesProvider nodeServicesProvider = new NodeServicesProvider(threadPool, null, null, null, null, null, clusterService); + MetaDataIndexStateService indexStateService = new MetaDataIndexStateService(settings, clusterService, allocationService, + metaDataIndexUpgradeService, nodeServicesProvider, indicesService); + MetaDataDeleteIndexService deleteIndexService = new MetaDataDeleteIndexService(settings, clusterService, allocationService); + MetaDataUpdateSettingsService metaDataUpdateSettingsService = new MetaDataUpdateSettingsService(settings, clusterService, + allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, new IndexNameExpressionResolver(settings)); + MetaDataCreateIndexService createIndexService = new MetaDataCreateIndexService(settings, clusterService, indicesService, + allocationService, Version.CURRENT, new AliasValidator(settings), Collections.emptySet(), environment, + nodeServicesProvider, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS); + + transportCloseIndexAction = new TransportCloseIndexAction(settings, transportService, clusterService, threadPool, + indexStateService, clusterSettings, actionFilters, indexNameExpressionResolver, destructiveOperations); + transportOpenIndexAction = new TransportOpenIndexAction(settings, transportService, + clusterService, threadPool, indexStateService, actionFilters, indexNameExpressionResolver, destructiveOperations); + transportDeleteIndexAction = new TransportDeleteIndexAction(settings, transportService, + clusterService, threadPool, deleteIndexService, actionFilters, indexNameExpressionResolver, destructiveOperations); + transportUpdateSettingsAction = new TransportUpdateSettingsAction(settings, + transportService, clusterService, threadPool, metaDataUpdateSettingsService, actionFilters, indexNameExpressionResolver); + transportClusterRerouteAction = new TransportClusterRerouteAction(settings, + transportService, clusterService, threadPool, allocationService, actionFilters, indexNameExpressionResolver); + transportCreateIndexAction = new TransportCreateIndexAction(settings, + transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver); + } + + public ClusterState createIndex(ClusterState state, CreateIndexRequest request) { + return execute(transportCreateIndexAction, request, state); + } + + public ClusterState closeIndices(ClusterState state, CloseIndexRequest request) { + return execute(transportCloseIndexAction, request, state); + } + + public ClusterState openIndices(ClusterState state, OpenIndexRequest request) { + return execute(transportOpenIndexAction, request, state); + } + + public ClusterState deleteIndices(ClusterState state, DeleteIndexRequest request) { + return execute(transportDeleteIndexAction, request, state); + } + + public ClusterState updateSettings(ClusterState state, UpdateSettingsRequest request) { + return execute(transportUpdateSettingsAction, request, state); + } + + public ClusterState reroute(ClusterState state, ClusterRerouteRequest request) { + return execute(transportClusterRerouteAction, request, state); + } + + public ClusterState applyFailedShards(ClusterState clusterState, List failedShards) { + RoutingAllocation.Result rerouteResult = allocationService.applyFailedShards(clusterState, failedShards); + return ClusterState.builder(clusterState).routingResult(rerouteResult).build(); + } + + public ClusterState applyStartedShards(ClusterState clusterState, List startedShards) { + RoutingAllocation.Result rerouteResult = allocationService.applyStartedShards(clusterState, startedShards); + return ClusterState.builder(clusterState).routingResult(rerouteResult).build(); + } + + private , Response extends ActionResponse> ClusterState execute( + TransportMasterNodeAction masterNodeAction, Request request, ClusterState clusterState) { + return executeClusterStateUpdateTask(clusterState, () -> { + try { + TransportMasterNodeActionUtils.runMasterOperation(masterNodeAction, request, clusterState, new PlainActionFuture<>()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + private ClusterState executeClusterStateUpdateTask(ClusterState state, Runnable runnable) { + ClusterState[] result = new ClusterState[1]; + doAnswer(invocationOnMock -> { + ClusterStateUpdateTask task = (ClusterStateUpdateTask)invocationOnMock.getArguments()[1]; + result[0] = task.execute(state); + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class)); + runnable.run(); + assertThat(result[0], notNullValue()); + return result[0]; + } +} diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java new file mode 100644 index 00000000000..8c63c001a1e --- /dev/null +++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -0,0 +1,281 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.cluster; + +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation.FailedShard; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.indices.recovery.RecoveryTargetService; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Executor; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndicesClusterStateServiceTestCase { + + private final ClusterStateChanges cluster = new ClusterStateChanges(); + + public void testRandomClusterStateUpdates() { + // we have an IndicesClusterStateService per node in the cluster + final Map clusterStateServiceMap = new HashMap<>(); + ClusterState state = randomInitialClusterState(clusterStateServiceMap); + + // each of the following iterations represents a new cluster state update processed on all nodes + for (int i = 0; i < 30; i++) { + logger.info("Iteration {}", i); + final ClusterState previousState = state; + + // calculate new cluster state + for (int j = 0; j < randomInt(3); j++) { // multiple iterations to simulate batching of cluster states + state = randomlyUpdateClusterState(state, clusterStateServiceMap); + } + + // apply cluster state to nodes (incl. master) + for (DiscoveryNode node : state.nodes()) { + IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node); + ClusterState localState = adaptClusterStateToLocalNode(state, node); + ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node); + indicesClusterStateService.clusterChanged(new ClusterChangedEvent("simulated change " + i, localState, previousLocalState)); + + // check that cluster state has been properly applied to node + assertClusterStateMatchesNodeState(localState, indicesClusterStateService); + } + } + + // TODO: check if we can go to green by starting all shards and finishing all iterations + logger.info("Final cluster state: {}", state.prettyPrint()); + } + + public ClusterState randomInitialClusterState(Map clusterStateServiceMap) { + List allNodes = new ArrayList<>(); + DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master + allNodes.add(localNode); + // at least two nodes that have the data role so that we can allocate shards + allNodes.add(createNode(DiscoveryNode.Role.DATA)); + allNodes.add(createNode(DiscoveryNode.Role.DATA)); + for (int i = 0; i < randomIntBetween(2, 5); i++) { + allNodes.add(createNode()); + } + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()])); + // add nodes to clusterStateServiceMap + updateNodes(state, clusterStateServiceMap); + return state; + } + + private void updateNodes(ClusterState state, Map clusterStateServiceMap) { + for (DiscoveryNode node : state.nodes()) { + clusterStateServiceMap.computeIfAbsent(node, discoveryNode -> { + IndicesClusterStateService ics = createIndicesClusterStateService(); + ics.start(); + return ics; + }); + } + + for (Iterator> it = clusterStateServiceMap.entrySet().iterator(); it.hasNext(); ) { + DiscoveryNode node = it.next().getKey(); + if (state.nodes().nodeExists(node.getId()) == false) { + it.remove(); + } + } + } + + public ClusterState randomlyUpdateClusterState(ClusterState state, + Map clusterStateServiceMap) { + // randomly create new indices (until we have 200 max) + for (int i = 0; i < randomInt(5); i++) { + if (state.metaData().indices().size() > 200) { + break; + } + String name = "index_" + randomAsciiOfLength(15).toLowerCase(Locale.ROOT); + CreateIndexRequest request = new CreateIndexRequest(name, Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) + .put(SETTING_NUMBER_OF_REPLICAS, randomInt(2)) + .build()); + state = cluster.createIndex(state, request); + assertTrue(state.metaData().hasIndex(name)); + } + + // randomly delete indices + Set indicesToDelete = new HashSet<>(); + int numberOfIndicesToDelete = randomInt(Math.min(2, state.metaData().indices().size())); + for (String index : randomSubsetOf(numberOfIndicesToDelete, state.metaData().indices().keys().toArray(String.class))) { + indicesToDelete.add(state.metaData().index(index).getIndex().getName()); + } + if (indicesToDelete.isEmpty() == false) { + DeleteIndexRequest deleteRequest = new DeleteIndexRequest(indicesToDelete.toArray(new String[indicesToDelete.size()])); + state = cluster.deleteIndices(state, deleteRequest); + for (String index : indicesToDelete) { + assertFalse(state.metaData().hasIndex(index)); + } + } + + // randomly close indices + int numberOfIndicesToClose = randomInt(Math.min(1, state.metaData().indices().size())); + for (String index : randomSubsetOf(numberOfIndicesToClose, state.metaData().indices().keys().toArray(String.class))) { + CloseIndexRequest closeIndexRequest = new CloseIndexRequest(state.metaData().index(index).getIndex().getName()); + state = cluster.closeIndices(state, closeIndexRequest); + } + + // randomly open indices + int numberOfIndicesToOpen = randomInt(Math.min(1, state.metaData().indices().size())); + for (String index : randomSubsetOf(numberOfIndicesToOpen, state.metaData().indices().keys().toArray(String.class))) { + OpenIndexRequest openIndexRequest = new OpenIndexRequest(state.metaData().index(index).getIndex().getName()); + state = cluster.openIndices(state, openIndexRequest); + } + + // randomly update settings + Set indicesToUpdate = new HashSet<>(); + boolean containsClosedIndex = false; + int numberOfIndicesToUpdate = randomInt(Math.min(2, state.metaData().indices().size())); + for (String index : randomSubsetOf(numberOfIndicesToUpdate, state.metaData().indices().keys().toArray(String.class))) { + indicesToUpdate.add(state.metaData().index(index).getIndex().getName()); + if (state.metaData().index(index).getState() == IndexMetaData.State.CLOSE) { + containsClosedIndex = true; + } + } + if (indicesToUpdate.isEmpty() == false) { + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest( + indicesToUpdate.toArray(new String[indicesToUpdate.size()])); + Settings.Builder settings = Settings.builder(); + if (containsClosedIndex == false) { + settings.put(SETTING_NUMBER_OF_REPLICAS, randomInt(2)); + } + settings.put("index.refresh_interval", randomIntBetween(1, 5) + "s"); + updateSettingsRequest.settings(settings.build()); + state = cluster.updateSettings(state, updateSettingsRequest); + } + + // randomly reroute + if (rarely()) { + state = cluster.reroute(state, new ClusterRerouteRequest()); + } + + // randomly start and fail allocated shards + List startedShards = new ArrayList<>(); + List failedShards = new ArrayList<>(); + for (DiscoveryNode node : state.nodes()) { + IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node); + MockIndicesService indicesService = (MockIndicesService) indicesClusterStateService.indicesService; + for (MockIndexService indexService : indicesService) { + for (MockIndexShard indexShard : indexService) { + ShardRouting persistedShardRouting = indexShard.routingEntry(); + if (persistedShardRouting.initializing() && randomBoolean()) { + startedShards.add(persistedShardRouting); + } else if (rarely()) { + failedShards.add(new FailedShard(persistedShardRouting, "fake shard failure", new Exception())); + } + } + } + } + state = cluster.applyFailedShards(state, failedShards); + state = cluster.applyStartedShards(state, startedShards); + + // randomly add and remove nodes (except current master) + if (rarely()) { + if (randomBoolean()) { + // add node + if (state.nodes().getSize() < 10) { + DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).put(createNode()).build(); + state = ClusterState.builder(state).nodes(newNodes).build(); + state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave + updateNodes(state, clusterStateServiceMap); + } + } else { + // remove node + if (state.nodes().getDataNodes().size() > 3) { + DiscoveryNode discoveryNode = randomFrom(state.nodes().getNodes().values().toArray(DiscoveryNode.class)); + if (discoveryNode.equals(state.nodes().getMasterNode()) == false) { + DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).remove(discoveryNode.getId()).build(); + state = ClusterState.builder(state).nodes(newNodes).build(); + state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node join + updateNodes(state, clusterStateServiceMap); + } + } + } + } + + // TODO: go masterless? + + return state; + } + + protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) { + Set roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values()))); + for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) { + roles.add(mustHaveRole); + } + return new DiscoveryNode("node_" + randomAsciiOfLength(8), DummyTransportAddress.INSTANCE, Collections.emptyMap(), roles, + Version.CURRENT); + } + + private static ClusterState adaptClusterStateToLocalNode(ClusterState state, DiscoveryNode node) { + return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(node.getId())).build(); + } + + private IndicesClusterStateService createIndicesClusterStateService() { + final ThreadPool threadPool = mock(ThreadPool.class); + final Executor executor = mock(Executor.class); + when(threadPool.generic()).thenReturn(executor); + final MockIndicesService indicesService = new MockIndicesService(); + final TransportService transportService = new TransportService(Settings.EMPTY, null, threadPool, null); + final ClusterService clusterService = mock(ClusterService.class); + final RepositoriesService repositoriesService = new RepositoriesService(Settings.EMPTY, clusterService, + transportService, null, null); + final RecoveryTargetService recoveryTargetService = new RecoveryTargetService(Settings.EMPTY, threadPool, + transportService, null, clusterService); + final ShardStateAction shardStateAction = mock(ShardStateAction.class); + return new IndicesClusterStateService(Settings.EMPTY, indicesService, clusterService, + threadPool, recoveryTargetService, shardStateAction, null, repositoriesService, null, null, null, null, null); + } + +}