Make IndicesClusterStateService unit testable (#17270)

Testability of ICSS is achieved by introducing interfaces for IndicesService, IndexService and IndexShard. These interfaces extract all relevant methods used by ICSS (which do not deal directly with store) and give the possibility to easily mock all the store behavior away in the tests (and cuts down on dependencies).
This commit is contained in:
Yannick Welsch 2016-06-10 12:47:41 +02:00 committed by GitHub
parent 62025d39d3
commit 6ea89004cd
19 changed files with 1556 additions and 567 deletions

View File

@ -482,7 +482,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]analysis[/\\]PreBuiltCacheFactory.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]analysis[/\\]PreBuiltTokenFilters.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]breaker[/\\]HierarchyCircuitBreakerService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]cluster[/\\]IndicesClusterStateService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]fielddata[/\\]cache[/\\]IndicesFieldDataCache.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]fielddata[/\\]cache[/\\]IndicesFieldDataCacheListener.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]flush[/\\]ShardsSyncedFlushResult.java" checks="LineLength" />

View File

@ -148,18 +148,11 @@ public class ClusterChangedEvent {
* has changed between the previous cluster state and the new cluster state.
* Note that this is an object reference equality test, not an equals test.
*/
public boolean indexMetaDataChanged(IndexMetaData current) {
MetaData previousMetaData = previousState.metaData();
if (previousMetaData == null) {
return true;
}
IndexMetaData previousIndexMetaData = previousMetaData.index(current.getIndex());
public static boolean indexMetaDataChanged(IndexMetaData metaData1, IndexMetaData metaData2) {
assert metaData1 != null && metaData2 != null;
// no need to check on version, since disco modules will make sure to use the
// same instance if its a version match
if (previousIndexMetaData == current) {
return false;
}
return true;
return metaData1 != metaData2;
}
/**

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -58,13 +59,12 @@ public class NodeMappingRefreshAction extends AbstractComponent {
transportService.registerRequestHandler(ACTION_NAME, NodeMappingRefreshRequest::new, ThreadPool.Names.SAME, new NodeMappingRefreshTransportHandler());
}
public void nodeMappingRefresh(final ClusterState state, final NodeMappingRefreshRequest request) {
final DiscoveryNodes nodes = state.nodes();
if (nodes.getMasterNode() == null) {
public void nodeMappingRefresh(final DiscoveryNode masterNode, final NodeMappingRefreshRequest request) {
if (masterNode == null) {
logger.warn("can't send mapping refresh for [{}], no master known.", request.index());
return;
}
transportService.sendRequest(nodes.getMasterNode(), ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
transportService.sendRequest(masterNode, ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
private class NodeMappingRefreshTransportHandler implements TransportRequestHandler<NodeMappingRefreshRequest> {

View File

@ -67,6 +67,7 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.AliasFilterParsingException;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.threadpool.ThreadPool;
@ -93,7 +94,7 @@ import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
/**
*
*/
public final class IndexService extends AbstractIndexComponent implements IndexComponent, Iterable<IndexShard> {
public class IndexService extends AbstractIndexComponent implements IndicesClusterStateService.AllocatedIndex<IndexShard> {
private final IndexEventListener eventListener;
private final AnalysisService analysisService;
@ -184,8 +185,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
/**
* Return the shard with the provided id, or null if there is no such shard.
*/
@Nullable
public IndexShard getShardOrNull(int shardId) {
@Override
public @Nullable IndexShard getShardOrNull(int shardId) {
return shards.get(shardId);
}
@ -359,6 +360,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
return primary == false && IndexMetaData.isIndexUsingShadowReplicas(indexSettings);
}
@Override
public synchronized void removeShard(int shardId, String reason) {
final ShardId sId = new ShardId(index(), shardId);
final IndexShard indexShard;
@ -470,6 +472,11 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
return searchOperationListeners;
}
@Override
public boolean updateMapping(IndexMetaData indexMetaData) throws IOException {
return mapperService().updateMapping(indexMetaData);
}
private class StoreCloseListener implements Store.OnClose {
private final ShardId shardId;
private final boolean ownsShard;
@ -617,6 +624,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
return indexSettings.getIndexMetaData();
}
@Override
public synchronized void updateMetaData(final IndexMetaData metadata) {
final Translog.Durability oldTranslogDurability = indexSettings.getTranslogDurability();
if (indexSettings.updateIndexMetaData(metadata)) {

View File

@ -404,7 +404,7 @@ public final class IndexSettings {
*
* @return <code>true</code> iff any setting has been updated otherwise <code>false</code>.
*/
synchronized boolean updateIndexMetaData(IndexMetaData indexMetaData) {
public synchronized boolean updateIndexMetaData(IndexMetaData indexMetaData) {
final Settings newSettings = indexMetaData.getSettings();
if (version.equals(Version.indexCreated(newSettings)) == false) {
throw new IllegalArgumentException("version mismatch on settings update expected: " + version + " but was: " + Version.indexCreated(newSettings));

View File

@ -21,10 +21,13 @@ package org.elasticsearch.index.mapper;
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.DelegatingAnalyzerWrapper;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Setting;
@ -32,6 +35,7 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.mapper.Mapper.BuilderContext;
@ -183,6 +187,45 @@ public class MapperService extends AbstractIndexComponent {
}
}
public boolean updateMapping(IndexMetaData indexMetaData) throws IOException {
assert indexMetaData.getIndex().equals(index()) : "index mismatch: expected " + index() + " but was " + indexMetaData.getIndex();
// go over and add the relevant mappings (or update them)
boolean requireRefresh = false;
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.getMappings().values()) {
MappingMetaData mappingMd = cursor.value;
String mappingType = mappingMd.type();
CompressedXContent mappingSource = mappingMd.source();
// refresh mapping can happen when the parsing/merging of the mapping from the metadata doesn't result in the same
// mapping, in this case, we send to the master to refresh its own version of the mappings (to conform with the
// merge version of it, which it does when refreshing the mappings), and warn log it.
try {
DocumentMapper existingMapper = documentMapper(mappingType);
if (existingMapper == null || mappingSource.equals(existingMapper.mappingSource()) == false) {
String op = existingMapper == null ? "adding" : "updating";
if (logger.isDebugEnabled() && mappingSource.compressed().length < 512) {
logger.debug("[{}] {} mapping [{}], source [{}]", index(), op, mappingType, mappingSource.string());
} else if (logger.isTraceEnabled()) {
logger.trace("[{}] {} mapping [{}], source [{}]", index(), op, mappingType, mappingSource.string());
} else {
logger.debug("[{}] {} mapping [{}] (source suppressed due to length, use TRACE level if needed)", index(), op,
mappingType);
}
merge(mappingType, mappingSource, MergeReason.MAPPING_RECOVERY, true);
if (!documentMapper(mappingType).mappingSource().equals(mappingSource)) {
logger.debug("[{}] parsed mapping [{}], and got different sources\noriginal:\n{}\nparsed:\n{}", index(),
mappingType, mappingSource, documentMapper(mappingType).mappingSource());
requireRefresh = true;
}
}
} catch (Throwable e) {
logger.warn("[{}] failed to add mapping [{}], source [{}]", e, index(), mappingType, mappingSource);
throw e;
}
}
return requireRefresh;
}
//TODO: make this atomic
public void merge(Map<String, Map<String, Object>> mappings, boolean updateAllTypes) throws MapperParsingException {
// first, add the default mapping

View File

@ -37,8 +37,6 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Booleans;
@ -108,6 +106,7 @@ import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
@ -136,7 +135,7 @@ import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
public class IndexShard extends AbstractIndexShardComponent {
public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {
private final ThreadPool threadPool;
private final MapperService mapperService;
@ -338,6 +337,7 @@ public class IndexShard extends AbstractIndexShardComponent {
/**
* Returns the latest cluster routing entry received with this shard.
*/
@Override
public ShardRouting routingEntry() {
return this.shardRouting;
}
@ -348,13 +348,12 @@ public class IndexShard extends AbstractIndexShardComponent {
/**
* Updates the shards routing entry. This mutate the shards internal state depending
* on the changes that get introduced by the new routing value. This method will persist shard level metadata
* unless explicitly disabled.
* on the changes that get introduced by the new routing value. This method will persist shard level metadata.
*
* @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
* @throws IOException if shard state could not be persisted
*/
public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) throws IOException {
public void updateRoutingEntry(final ShardRouting newRouting) throws IOException {
final ShardRouting currentRouting = this.shardRouting;
if (!newRouting.shardId().equals(shardId())) {
throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId() + "");
@ -408,9 +407,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
this.shardRouting = newRouting;
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
if (persistState) {
persistMetadata(newRouting, currentRouting);
}
persistMetadata(newRouting, currentRouting);
}
/**
@ -589,7 +586,7 @@ public class IndexShard extends AbstractIndexShardComponent {
*/
public void refresh(String source) {
verifyNotClosed();
if (canIndex()) {
long bytes = getEngine().getIndexBufferRAMBytesUsed();
writingBytes.addAndGet(bytes);
@ -1370,35 +1367,36 @@ public class IndexShard extends AbstractIndexShardComponent {
return this.currentEngineReference.get();
}
public void startRecovery(DiscoveryNode localNode, DiscoveryNode sourceNode, RecoveryTargetService recoveryTargetService,
public void startRecovery(RecoveryState recoveryState, RecoveryTargetService recoveryTargetService,
RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
BiConsumer<String, MappingMetaData> mappingUpdateConsumer, IndicesService indicesService) {
final RestoreSource restoreSource = shardRouting.restoreSource();
if (shardRouting.isPeerRecovery()) {
assert sourceNode != null : "peer recovery started but sourceNode is null";
// we don't mark this one as relocated at the end.
// For primaries: requests in any case are routed to both when its relocating and that way we handle
// the edge case where its mark as relocated, and we might need to roll it back...
// For replicas: we are recovering a backup from a primary
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.PRIMARY_RELOCATION : RecoveryState.Type.REPLICA;
RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(), type, sourceNode, localNode);
try {
markAsRecovering("from " + sourceNode, recoveryState);
recoveryTargetService.startRecovery(this, type, sourceNode, recoveryListener);
} catch (Throwable e) {
failShard("corrupted preexisting index", e);
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, e), true);
}
} else if (restoreSource == null) {
// recover from filesystem store
IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
Index mergeSourceIndex = indexMetaData.getMergeSourceIndex();
final boolean recoverFromLocalShards = mergeSourceIndex != null && shardRouting.allocatedPostIndexCreate(indexMetaData) == false && shardRouting.primary();
final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(),
recoverFromLocalShards ? RecoveryState.Type.LOCAL_SHARDS : RecoveryState.Type.STORE, localNode, localNode);
if (recoverFromLocalShards) {
BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
IndicesService indicesService) {
switch (recoveryState.getType()) {
case PRIMARY_RELOCATION:
case REPLICA:
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
recoveryTargetService.startRecovery(this, recoveryState.getType(), recoveryState.getSourceNode(), recoveryListener);
} catch (Throwable e) {
failShard("corrupted preexisting index", e);
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
}
break;
case STORE:
markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromStore()) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Throwable t) {
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, t), true);
}
});
break;
case LOCAL_SHARDS:
final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
final Index mergeSourceIndex = indexMetaData.getMergeSourceIndex();
final List<IndexShard> startedShards = new ArrayList<>();
final IndexService sourceIndexService = indicesService.indexService(mergeSourceIndex);
final int numShards = sourceIndexService != null ? sourceIndexService.getIndexSettings().getNumberOfShards() : -1;
@ -1414,14 +1412,14 @@ public class IndexShard extends AbstractIndexShardComponent {
threadPool.generic().execute(() -> {
try {
final Set<ShardId> shards = IndexMetaData.selectShrinkShards(shardId().id(), sourceIndexService.getMetaData(),
indexMetaData.getNumberOfShards());
+ indexMetaData.getNumberOfShards());
if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream()
.filter((s) -> shards.contains(s.shardId())).collect(Collectors.toList()))) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Throwable t) {
recoveryListener.onRecoveryFailure(recoveryState,
new RecoveryFailedException(shardId, localNode, localNode, t), true);
new RecoveryFailedException(recoveryState, null, t), true);
}
});
} else {
@ -1433,36 +1431,25 @@ public class IndexShard extends AbstractIndexShardComponent {
+ " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard "
+ shardId());
}
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, localNode, localNode, t), true);
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, t), true);
}
} else {
markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
break;
case SNAPSHOT:
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromStore()) {
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(
recoveryState.getRestoreSource().snapshot().getRepository());
if (restoreFromRepository(indexShardRepository)) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Throwable t) {
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, t), true);
} catch (Throwable first) {
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, first), true);
}
});
}
} else {
// recover from a restore
final RecoveryState recoveryState = new RecoveryState(shardId(), shardRouting.primary(),
RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), localNode);
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
final IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshot().getRepository());
if (restoreFromRepository(indexShardRepository)) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Throwable first) {
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(shardId, sourceNode, localNode, first), true);
}
});
break;
default:
throw new IllegalArgumentException("Unknown recovery type " + recoveryState.getType());
}
}
@ -1472,7 +1459,7 @@ public class IndexShard extends AbstractIndexShardComponent {
// called by the current engine
@Override
public void onFailedEngine(String reason, @Nullable Throwable failure) {
final ShardFailure shardFailure = new ShardFailure(shardRouting, reason, failure, getIndexUUID());
final ShardFailure shardFailure = new ShardFailure(shardRouting, reason, failure);
for (Callback<ShardFailure> listener : delegates) {
try {
listener.handle(shardFailure);
@ -1661,13 +1648,11 @@ public class IndexShard extends AbstractIndexShardComponent {
public final String reason;
@Nullable
public final Throwable cause;
public final String indexUUID;
public ShardFailure(ShardRouting routing, String reason, @Nullable Throwable cause, String indexUUID) {
public ShardFailure(ShardRouting routing, String reason, @Nullable Throwable cause) {
this.routing = routing;
this.reason = reason;
this.cause = cause;
this.indexUUID = indexUUID;
}
}

View File

@ -59,16 +59,16 @@ public final class ShadowIndexShard extends IndexShard {
/**
* In addition to the regular accounting done in
* {@link IndexShard#updateRoutingEntry(ShardRouting, boolean)},
* {@link IndexShard#updateRoutingEntry(ShardRouting)},
* if this shadow replica needs to be promoted to a primary, the shard is
* failed in order to allow a new primary to be re-allocated.
*/
@Override
public void updateRoutingEntry(ShardRouting newRouting, boolean persistState) throws IOException {
public void updateRoutingEntry(ShardRouting newRouting) throws IOException {
if (newRouting.primary() == true) {// becoming a primary
throw new IllegalStateException("can't promote shard to primary");
}
super.updateRoutingEntry(newRouting, persistState);
super.updateRoutingEntry(newRouting);
}
@Override

View File

@ -35,6 +35,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
@ -55,6 +56,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.env.NodeEnvironment;
@ -86,10 +88,14 @@ import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QueryPhase;
@ -124,7 +130,8 @@ import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList;
/**
*
*/
public class IndicesService extends AbstractLifecycleComponent<IndicesService> implements Iterable<IndexService>, IndexService.ShardStoreDeleter {
public class IndicesService extends AbstractLifecycleComponent<IndicesService>
implements IndicesClusterStateService.AllocatedIndices<IndexShard, IndexService>, IndexService.ShardStoreDeleter {
public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout";
public static final Setting<TimeValue> INDICES_CACHE_CLEAN_INTERVAL_SETTING =
@ -296,11 +303,14 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}
/**
* Returns <tt>true</tt> if changes (adding / removing) indices, shards and so on are allowed.
* Checks if changes (adding / removing) indices, shards and so on are allowed.
*
* @throws IllegalStateException if no changes allowed.
*/
public boolean changesAllowed() {
// we check on stop here since we defined stop when we delete the indices
return lifecycle.started();
private void ensureChangesAllowed() {
if (lifecycle.started() == false) {
throw new IllegalStateException("Can't make changes to indices service, node is closed");
}
}
@Override
@ -314,10 +324,9 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
/**
* Returns an IndexService for the specified index if exists otherwise returns <code>null</code>.
*
*/
@Nullable
public IndexService indexService(Index index) {
@Override
public @Nullable IndexService indexService(Index index) {
return indices.get(index.getUUID());
}
@ -339,11 +348,9 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
* @param builtInListeners a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with the per-index listeners
* @throws IndexAlreadyExistsException if the index already exists.
*/
@Override
public synchronized IndexService createIndex(final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, List<IndexEventListener> builtInListeners) throws IOException {
if (!lifecycle.started()) {
throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed");
}
ensureChangesAllowed();
if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
}
@ -424,14 +431,44 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}
}
@Override
public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, RecoveryTargetService recoveryTargetService,
RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
NodeServicesProvider nodeServicesProvider, Callback<IndexShard.ShardFailure> onShardFailure) throws IOException {
ensureChangesAllowed();
IndexService indexService = indexService(shardRouting.index());
IndexShard indexShard = indexService.createShard(shardRouting);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService,
(type, mapping) -> {
assert recoveryState.getType() == RecoveryState.Type.LOCAL_SHARDS :
"mapping update consumer only required by local shards recovery";
try {
nodeServicesProvider.getClient().admin().indices().preparePutMapping()
.setConcreteIndex(shardRouting.index()) // concrete index - no name clash, it uses uuid
.setType(type)
.setSource(mapping.source().string())
.get();
} catch (IOException ex) {
throw new ElasticsearchException("failed to stringify mapping source", ex);
}
}, this);
return indexShard;
}
/**
* Removes the given index from this service and releases all associated resources. Persistent parts of the index
* like the shards files, state and transaction logs are kept around in the case of a disaster recovery.
* @param index the index to remove
* @param reason the high level reason causing this removal
*/
@Override
public void removeIndex(Index index, String reason) {
removeIndex(index, reason, false);
try {
removeIndex(index, reason, false);
} catch (Throwable e) {
logger.warn("failed to remove index ({})", e, reason);
}
}
private void removeIndex(Index index, String reason, boolean delete) {
@ -516,14 +553,20 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
* @param index the index to delete
* @param reason the high level reason causing this delete
*/
public void deleteIndex(Index index, String reason) throws IOException {
removeIndex(index, reason, true);
@Override
public void deleteIndex(Index index, String reason) {
try {
removeIndex(index, reason, true);
} catch (Throwable e) {
logger.warn("failed to delete index ({})", e, reason);
}
}
/**
* Deletes an index that is not assigned to this node. This method cleans up all disk folders relating to the index
* but does not deal with in-memory structures. For those call {@link #deleteIndex(Index, String)}
*/
@Override
public void deleteUnassignedIndex(String reason, IndexMetaData metaData, ClusterState clusterState) {
if (nodeEnv.hasNodeFile()) {
String indexName = metaData.getIndex().getName();
@ -683,8 +726,8 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
* @param clusterState {@code ClusterState} to ensure the index is not part of it
* @return IndexMetaData for the index loaded from disk
*/
@Nullable
public IndexMetaData verifyIndexIsDeleted(final Index index, final ClusterState clusterState) {
@Override
public @Nullable IndexMetaData verifyIndexIsDeleted(final Index index, final ClusterState clusterState) {
// this method should only be called when we know the index (name + uuid) is not part of the cluster state
if (clusterState.metaData().index(index) != null) {
throw new IllegalStateException("Cannot delete index [" + index + "], it is still part of the cluster state.");
@ -839,6 +882,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
* @param index the index to process the pending deletes for
* @param timeout the timeout used for processing pending deletes
*/
@Override
public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeout) throws IOException, InterruptedException {
logger.debug("{} processing pending deletes", index);
final long startTimeNS = System.nanoTime();

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.master;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterState;
public class TransportMasterNodeActionUtils {
/**
* Allows to directly call {@link TransportMasterNodeAction#masterOperation(MasterNodeRequest, ClusterState, ActionListener)} which is
* a protected method.
*/
public static <Request extends MasterNodeRequest<Request>, Response extends ActionResponse> void runMasterOperation(
TransportMasterNodeAction<Request, Response> masterNodeAction, Request request, ClusterState clusterState,
ActionListener<Response> actionListener) throws Exception {
assert masterNodeAction.checkBlock(request, clusterState) == null;
masterNodeAction.masterOperation(request, clusterState, actionListener);
}
}

View File

@ -140,24 +140,24 @@ public class ClusterChangedEventTests extends ESTestCase {
*/
public void testIndexMetaDataChange() {
final int numNodesInCluster = 3;
final ClusterState originalState = createState(numNodesInCluster, randomBoolean(), initialIndices);
final ClusterState newState = originalState; // doesn't matter for this test, just need a non-null value
final ClusterChangedEvent event = new ClusterChangedEvent("_na_", originalState, newState);
final ClusterState state = createState(numNodesInCluster, randomBoolean(), initialIndices);
// test when its not the same IndexMetaData
final Index index = initialIndices.get(0);
final IndexMetaData originalIndexMeta = originalState.metaData().index(index);
final IndexMetaData originalIndexMeta = state.metaData().index(index);
// make sure the metadata is actually on the cluster state
assertNotNull("IndexMetaData for " + index + " should exist on the cluster state", originalIndexMeta);
IndexMetaData newIndexMeta = createIndexMetadata(index, originalIndexMeta.getVersion() + 1);
assertTrue("IndexMetaData with different version numbers must be considered changed", event.indexMetaDataChanged(newIndexMeta));
assertTrue("IndexMetaData with different version numbers must be considered changed",
ClusterChangedEvent.indexMetaDataChanged(originalIndexMeta, newIndexMeta));
// test when it doesn't exist
newIndexMeta = createIndexMetadata(new Index("doesntexist", UUIDs.randomBase64UUID()));
assertTrue("IndexMetaData that didn't previously exist should be considered changed", event.indexMetaDataChanged(newIndexMeta));
assertTrue("IndexMetaData that didn't previously exist should be considered changed",
ClusterChangedEvent.indexMetaDataChanged(originalIndexMeta, newIndexMeta));
// test when its the same IndexMetaData
assertFalse("IndexMetaData should be the same", event.indexMetaDataChanged(originalIndexMeta));
assertFalse("IndexMetaData should be the same", ClusterChangedEvent.indexMetaDataChanged(originalIndexMeta, originalIndexMeta));
}
/**

View File

@ -161,7 +161,7 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase {
}
}
private static final class RandomAllocationDecider extends AllocationDecider {
public static final class RandomAllocationDecider extends AllocationDecider {
private final Random random;

View File

@ -217,24 +217,14 @@ public class IndexShardTests extends ESSingleNodeTestCase {
ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(getShardStateMetadata(shard), shardStateMetaData);
ShardRouting routing = shard.shardRouting;
shard.updateRoutingEntry(routing, true);
shard.updateRoutingEntry(routing);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
// check that we don't write shard state metadata if persist == false
ShardRouting updatedRouting = shard.shardRouting;
updatedRouting = TestShardRouting.relocate(shard.shardRouting, "some node", 42L);
shard.updateRoutingEntry(updatedRouting, false);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertFalse("shard state persisted despite of persist=false", shardStateMetaData.equals(getShardStateMetadata(shard)));
assertEquals("shard state persisted despite of persist=false", shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
// check that we write shard state metadata if persist == true
shard.updateRoutingEntry(routing, false); // move back state in IndexShard
routing = updatedRouting;
shard.updateRoutingEntry(routing, true);
routing = TestShardRouting.relocate(shard.shardRouting, "some node", 42L);
shard.updateRoutingEntry(routing);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
@ -336,7 +326,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
// simulate promotion
ShardRouting newReplicaShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), null,
false, ShardRoutingState.STARTED, temp.allocationId());
indexShard.updateRoutingEntry(newReplicaShardRouting, false);
indexShard.updateRoutingEntry(newReplicaShardRouting);
primaryTerm = primaryTerm + 1;
indexShard.updatePrimaryTerm(primaryTerm);
newPrimaryShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), null,
@ -344,7 +334,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
} else {
newPrimaryShardRouting = temp;
}
indexShard.updateRoutingEntry(newPrimaryShardRouting, false);
indexShard.updateRoutingEntry(newPrimaryShardRouting);
assertEquals(0, indexShard.getActiveOperationsCount());
if (newPrimaryShardRouting.isRelocationTarget() == false) {
@ -381,7 +371,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
newShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), null,
false, ShardRoutingState.STARTED, AllocationId.newRelocation(temp.allocationId()));
indexShard.updateRoutingEntry(newShardRouting, false);
indexShard.updateRoutingEntry(newShardRouting);
break;
case 1:
// initializing replica / primary
@ -391,13 +381,13 @@ public class IndexShardTests extends ESSingleNodeTestCase {
relocating ? randomBoolean() : false,
ShardRoutingState.INITIALIZING,
relocating ? AllocationId.newRelocation(temp.allocationId()) : temp.allocationId());
indexShard.updateRoutingEntry(newShardRouting, false);
indexShard.updateRoutingEntry(newShardRouting);
break;
case 2:
// relocation source
newShardRouting = TestShardRouting.newShardRouting(temp.shardId(), temp.currentNodeId(), "otherNode",
false, ShardRoutingState.RELOCATING, AllocationId.newRelocation(temp.allocationId()));
indexShard.updateRoutingEntry(newShardRouting, false);
indexShard.updateRoutingEntry(newShardRouting);
indexShard.relocated("test");
break;
default:
@ -983,7 +973,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
test.removeShard(0, "b/c simon says so");
routing = ShardRoutingHelper.reinit(routing);
IndexShard newShard = test.createShard(routing);
newShard.updateRoutingEntry(routing, false);
newShard.updateRoutingEntry(routing);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
assertTrue(newShard.recoverFromStore());
@ -991,7 +981,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
newShard.updateRoutingEntry(routing.moveToStarted(), true);
newShard.updateRoutingEntry(routing.moveToStarted());
SearchResponse response = client().prepareSearch().get();
assertHitCount(response, 1);
}
@ -1010,7 +1000,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
test.removeShard(0, "b/c simon says so");
routing = ShardRoutingHelper.reinit(routing, UnassignedInfo.Reason.INDEX_CREATED);
IndexShard newShard = test.createShard(routing);
newShard.updateRoutingEntry(routing, false);
newShard.updateRoutingEntry(routing);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode,
localNode));
@ -1019,7 +1009,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertEquals(0, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart());
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted(), true);
newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted());
SearchResponse response = client().prepareSearch().get();
assertHitCount(response, 0);
}
@ -1045,7 +1035,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
store.decRef();
routing = ShardRoutingHelper.reinit(routing);
IndexShard newShard = test.createShard(routing);
newShard.updateRoutingEntry(routing, false);
newShard.updateRoutingEntry(routing);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
try {
newShard.recoverFromStore();
@ -1065,11 +1055,11 @@ public class IndexShardTests extends ESSingleNodeTestCase {
}
test.removeShard(0, "I broken it");
newShard = test.createShard(routing);
newShard.updateRoutingEntry(routing, false);
newShard.updateRoutingEntry(routing);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore());
newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted(), true);
newShard.updateRoutingEntry(getInitializingShardRouting(routing).moveToStarted());
SearchResponse response = client().prepareSearch().get();
assertHitCount(response, 0);
// we can't issue this request through a client because of the inconsistencies we created with the cluster state
@ -1090,11 +1080,11 @@ public class IndexShardTests extends ESSingleNodeTestCase {
ShardRouting origRouting = shard.routingEntry();
assertThat(shard.state(), equalTo(IndexShardState.STARTED));
ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node");
shard.updateRoutingEntry(inRecoveryRouting, true);
shard.updateRoutingEntry(inRecoveryRouting);
shard.relocated("simulate mark as relocated");
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
try {
shard.updateRoutingEntry(origRouting, true);
shard.updateRoutingEntry(origRouting);
fail("Expected IndexShardRelocatedException");
} catch (IndexShardRelocatedException expected) {
}
@ -1123,7 +1113,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
Store sourceStore = test_shard.store();
Store targetStore = test_target_shard.store();
test_target_shard.updateRoutingEntry(routing, false);
test_target_shard.updateRoutingEntry(routing);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
test_target_shard.markAsRecovering("store", new RecoveryState(routing.shardId(), routing.primary(), RecoveryState.Type.SNAPSHOT, routing.restoreSource(), localNode));
assertTrue(test_target_shard.restoreFromRepository(new IndexShardRepository() {
@ -1156,7 +1146,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
}
}));
test_target_shard.updateRoutingEntry(routing.moveToStarted(), true);
test_target_shard.updateRoutingEntry(routing.moveToStarted());
assertHitCount(client().prepareSearch("test_target").get(), 1);
assertSearchHits(client().prepareSearch("test_target").get(), "0");
}
@ -1411,7 +1401,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), newShard.routingEntry().primary(), RecoveryState.Type.STORE, localNode, localNode));
assertTrue(newShard.recoverFromStore());
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted(), true);
newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
return newShard;
}
@ -1543,7 +1533,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
{
final IndexShard newShard = test.createShard(routing);
newShard.updateRoutingEntry(routing, false);
newShard.updateRoutingEntry(routing);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.LOCAL_SHARDS, localNode, localNode));
BiConsumer<String, MappingMetaData> mappingConsumer = (type, mapping) -> {
@ -1575,7 +1565,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
}
}
routing = ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
newShard.updateRoutingEntry(routing);
assertHitCount(client().prepareSearch("index_1").get(), 2);
}
// now check that it's persistent ie. that the added shards are committed
@ -1587,7 +1577,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.LOCAL_SHARDS, localNode, localNode));
assertTrue(newShard.recoverFromStore());
routing = ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
newShard.updateRoutingEntry(routing);
assertHitCount(client().prepareSearch("index_1").get(), 2);
}

View File

@ -455,7 +455,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
assertEquals(1, imc.availableShards().size());
assertTrue(newShard.recoverFromStore());
assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1);
newShard.updateRoutingEntry(routing.moveToStarted(), true);
newShard.updateRoutingEntry(routing.moveToStarted());
} finally {
newShard.close("simon says", false);
}

View File

@ -102,13 +102,13 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
newRouting = ShardRoutingHelper.moveToUnassigned(newRouting, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "boom"));
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
IndexShard shard = index.createShard(newRouting);
shard.updateRoutingEntry(newRouting, true);
shard.updateRoutingEntry(newRouting);
final DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE,
emptyMap(), emptySet(), Version.CURRENT);
shard.markAsRecovering("store", new RecoveryState(shard.shardId(), newRouting.primary(), RecoveryState.Type.SNAPSHOT, newRouting.restoreSource(), localNode));
shard.recoverFromStore();
newRouting = ShardRoutingHelper.moveToStarted(newRouting);
shard.updateRoutingEntry(newRouting, true);
shard.updateRoutingEntry(newRouting);
} finally {
indicesService.deleteIndex(idx, "simon says");
}

View File

@ -0,0 +1,318 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.cluster;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndex;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.Shard;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
import static org.hamcrest.Matchers.equalTo;
/**
* Abstract base class for tests against {@link IndicesClusterStateService}
*/
public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestCase {
protected void failRandomly() {
if (rarely()) {
throw new RuntimeException("dummy test failure");
}
}
/**
* Checks if cluster state matches internal state of IndicesClusterStateService instance
*
* @param state cluster state used for matching
*/
public static void assertClusterStateMatchesNodeState(ClusterState state, IndicesClusterStateService indicesClusterStateService) {
AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService =
indicesClusterStateService.indicesService;
ConcurrentMap<ShardId, ShardRouting> failedShardsCache = indicesClusterStateService.failedShardsCache;
RoutingNode localRoutingNode = state.getRoutingNodes().node(state.getNodes().getLocalNodeId());
if (localRoutingNode != null) {
// check that all shards in local routing nodes have been allocated
for (ShardRouting shardRouting : localRoutingNode) {
Index index = shardRouting.index();
IndexMetaData indexMetaData = state.metaData().getIndexSafe(index);
Shard shard = indicesService.getShardOrNull(shardRouting.shardId());
ShardRouting failedShard = failedShardsCache.get(shardRouting.shardId());
if (shard == null && failedShard == null) {
fail("Shard with id " + shardRouting + " expected but missing in indicesService and failedShardsCache");
}
if (failedShard != null && failedShard.isSameAllocation(shardRouting) == false) {
fail("Shard cache has not been properly cleaned for " + failedShard);
}
if (shard != null) {
AllocatedIndex<? extends Shard> indexService = indicesService.indexService(index);
assertTrue("Index " + index + " expected but missing in indicesService", indexService != null);
// index metadata has been updated
assertThat(indexService.getIndexSettings().getIndexMetaData(), equalTo(indexMetaData));
// shard has been created
if (failedShard == null) {
assertTrue("Shard with id " + shardRouting + " expected but missing in indexService",
shard != null);
// shard has latest shard routing
assertThat(shard.routingEntry(), equalTo(shardRouting));
}
}
}
}
// all other shards / indices have been cleaned up
for (AllocatedIndex<? extends Shard> indexService : indicesService) {
assertTrue(state.metaData().getIndexSafe(indexService.index()) != null);
boolean shardsFound = false;
for (Shard shard : indexService) {
shardsFound = true;
ShardRouting persistedShardRouting = shard.routingEntry();
boolean found = false;
for (ShardRouting shardRouting : localRoutingNode) {
if (persistedShardRouting.equals(shardRouting)) {
found = true;
}
}
assertTrue(found);
}
if (shardsFound == false) {
// check if we have shards of that index in failedShardsCache
// if yes, we might not have cleaned the index as failedShardsCache can be populated by another thread
assertFalse(failedShardsCache.keySet().stream().noneMatch(shardId -> shardId.getIndex().equals(indexService.index())));
}
}
}
/**
* Mock for {@link IndicesService}
*/
protected class MockIndicesService implements AllocatedIndices<MockIndexShard, MockIndexService> {
private volatile Map<String, MockIndexService> indices = emptyMap();
@Override
public synchronized MockIndexService createIndex(NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData,
List<IndexEventListener> buildInIndexListener) throws IOException {
MockIndexService indexService = new MockIndexService(new IndexSettings(indexMetaData, Settings.EMPTY));
indices = newMapBuilder(indices).put(indexMetaData.getIndexUUID(), indexService).immutableMap();
return indexService;
}
@Override
public IndexMetaData verifyIndexIsDeleted(Index index, ClusterState state) {
return null;
}
@Override
public void deleteUnassignedIndex(String reason, IndexMetaData metaData, ClusterState clusterState) {
}
@Override
public synchronized void deleteIndex(Index index, String reason) {
if (hasIndex(index) == false) {
return;
}
Map<String, MockIndexService> newIndices = new HashMap<>(indices);
newIndices.remove(index.getUUID());
indices = unmodifiableMap(newIndices);
}
@Override
public synchronized void removeIndex(Index index, String reason) {
if (hasIndex(index) == false) {
return;
}
Map<String, MockIndexService> newIndices = new HashMap<>(indices);
newIndices.remove(index.getUUID());
indices = unmodifiableMap(newIndices);
}
@Override
public @Nullable MockIndexService indexService(Index index) {
return indices.get(index.getUUID());
}
@Override
public MockIndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState,
RecoveryTargetService recoveryTargetService,
RecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
NodeServicesProvider nodeServicesProvider, Callback<IndexShard.ShardFailure> onShardFailure)
throws IOException {
failRandomly();
MockIndexService indexService = indexService(recoveryState.getShardId().getIndex());
MockIndexShard indexShard = indexService.createShard(shardRouting);
indexShard.recoveryState = recoveryState;
return indexShard;
}
@Override
public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeValue) throws IOException,
InterruptedException {
}
private boolean hasIndex(Index index) {
return indices.containsKey(index.getUUID());
}
@Override
public Iterator<MockIndexService> iterator() {
return indices.values().iterator();
}
}
/**
* Mock for {@link IndexService}
*/
protected class MockIndexService implements AllocatedIndex<MockIndexShard> {
private volatile Map<Integer, MockIndexShard> shards = emptyMap();
private final IndexSettings indexSettings;
public MockIndexService(IndexSettings indexSettings) {
this.indexSettings = indexSettings;
}
@Override
public IndexSettings getIndexSettings() {
return indexSettings;
}
@Override
public boolean updateMapping(IndexMetaData indexMetaData) throws IOException {
failRandomly();
return false;
}
@Override
public void updateMetaData(IndexMetaData indexMetaData) {
indexSettings.updateIndexMetaData(indexMetaData);
}
@Override
public MockIndexShard getShardOrNull(int shardId) {
return shards.get(shardId);
}
public synchronized MockIndexShard createShard(ShardRouting routing) throws IOException {
failRandomly();
MockIndexShard shard = new MockIndexShard(routing);
shards = newMapBuilder(shards).put(routing.id(), shard).immutableMap();
return shard;
}
@Override
public synchronized void removeShard(int shardId, String reason) {
if (shards.containsKey(shardId) == false) {
return;
}
HashMap<Integer, MockIndexShard> newShards = new HashMap<>(shards);
MockIndexShard indexShard = newShards.remove(shardId);
assert indexShard != null;
shards = unmodifiableMap(newShards);
}
@Override
public Iterator<MockIndexShard> iterator() {
return shards.values().iterator();
}
@Override
public Index index() {
return indexSettings.getIndex();
}
}
/**
* Mock for {@link IndexShard}
*/
protected class MockIndexShard implements IndicesClusterStateService.Shard {
private volatile ShardRouting shardRouting;
private volatile RecoveryState recoveryState;
public MockIndexShard(ShardRouting shardRouting) {
this.shardRouting = shardRouting;
}
@Override
public ShardId shardId() {
return shardRouting.shardId();
}
@Override
public RecoveryState recoveryState() {
return recoveryState;
}
@Override
public ShardRouting routingEntry() {
return shardRouting;
}
@Override
public IndexShardState state() {
return null;
}
@Override
public void updateRoutingEntry(ShardRouting shardRouting) throws IOException {
failRandomly();
assert this.shardId().equals(shardRouting.shardId());
assert this.shardRouting.isSameAllocation(shardRouting);
this.shardRouting = shardRouting;
}
}
}

View File

@ -0,0 +1,234 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.open.TransportOpenIndexAction;
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.master.TransportMasterNodeActionUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.AliasValidator;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RandomAllocationDeciderTests;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom;
import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ClusterStateChanges {
private final ClusterService clusterService;
private final AllocationService allocationService;
// transport actions
private final TransportCloseIndexAction transportCloseIndexAction;
private final TransportOpenIndexAction transportOpenIndexAction;
private final TransportDeleteIndexAction transportDeleteIndexAction;
private final TransportUpdateSettingsAction transportUpdateSettingsAction;
private final TransportClusterRerouteAction transportClusterRerouteAction;
private final TransportCreateIndexAction transportCreateIndexAction;
public ClusterStateChanges() {
Settings settings = Settings.builder().put(PATH_HOME_SETTING.getKey(), "dummy").build();
allocationService = new AllocationService(settings, new AllocationDeciders(settings,
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(settings),
new ReplicaAfterPrimaryActiveAllocationDecider(settings),
new RandomAllocationDeciderTests.RandomAllocationDecider(getRandom())))),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings),
EmptyClusterInfoService.INSTANCE);
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ActionFilters actionFilters = new ActionFilters(Collections.emptySet());
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
DestructiveOperations destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Environment environment = new Environment(settings);
ThreadPool threadPool = null; // it's not used
Transport transport = null; // it's not used
// mocks
clusterService = mock(ClusterService.class);
IndicesService indicesService = mock(IndicesService.class);
// MetaDataCreateIndexService creates indices using its IndicesService instance to check mappings -> fake it here
try {
when(indicesService.createIndex(any(NodeServicesProvider.class), any(IndexMetaData.class), anyList()))
.then(invocationOnMock -> {
IndexService indexService = mock(IndexService.class);
IndexMetaData indexMetaData = (IndexMetaData)invocationOnMock.getArguments()[1];
when(indexService.index()).thenReturn(indexMetaData.getIndex());
MapperService mapperService = mock(MapperService.class);
when(indexService.mapperService()).thenReturn(mapperService);
when(mapperService.docMappers(anyBoolean())).thenReturn(Collections.emptyList());
when(indexService.getIndexEventListener()).thenReturn(new IndexEventListener() {});
return indexService;
});
} catch (IOException e) {
throw new IllegalStateException(e);
}
// services
TransportService transportService = new TransportService(settings, transport, threadPool, null);
MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, null, null) {
// metaData upgrader should do nothing
@Override
public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) {
return indexMetaData;
}
};
NodeServicesProvider nodeServicesProvider = new NodeServicesProvider(threadPool, null, null, null, null, null, clusterService);
MetaDataIndexStateService indexStateService = new MetaDataIndexStateService(settings, clusterService, allocationService,
metaDataIndexUpgradeService, nodeServicesProvider, indicesService);
MetaDataDeleteIndexService deleteIndexService = new MetaDataDeleteIndexService(settings, clusterService, allocationService);
MetaDataUpdateSettingsService metaDataUpdateSettingsService = new MetaDataUpdateSettingsService(settings, clusterService,
allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, new IndexNameExpressionResolver(settings));
MetaDataCreateIndexService createIndexService = new MetaDataCreateIndexService(settings, clusterService, indicesService,
allocationService, Version.CURRENT, new AliasValidator(settings), Collections.emptySet(), environment,
nodeServicesProvider, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
transportCloseIndexAction = new TransportCloseIndexAction(settings, transportService, clusterService, threadPool,
indexStateService, clusterSettings, actionFilters, indexNameExpressionResolver, destructiveOperations);
transportOpenIndexAction = new TransportOpenIndexAction(settings, transportService,
clusterService, threadPool, indexStateService, actionFilters, indexNameExpressionResolver, destructiveOperations);
transportDeleteIndexAction = new TransportDeleteIndexAction(settings, transportService,
clusterService, threadPool, deleteIndexService, actionFilters, indexNameExpressionResolver, destructiveOperations);
transportUpdateSettingsAction = new TransportUpdateSettingsAction(settings,
transportService, clusterService, threadPool, metaDataUpdateSettingsService, actionFilters, indexNameExpressionResolver);
transportClusterRerouteAction = new TransportClusterRerouteAction(settings,
transportService, clusterService, threadPool, allocationService, actionFilters, indexNameExpressionResolver);
transportCreateIndexAction = new TransportCreateIndexAction(settings,
transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver);
}
public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {
return execute(transportCreateIndexAction, request, state);
}
public ClusterState closeIndices(ClusterState state, CloseIndexRequest request) {
return execute(transportCloseIndexAction, request, state);
}
public ClusterState openIndices(ClusterState state, OpenIndexRequest request) {
return execute(transportOpenIndexAction, request, state);
}
public ClusterState deleteIndices(ClusterState state, DeleteIndexRequest request) {
return execute(transportDeleteIndexAction, request, state);
}
public ClusterState updateSettings(ClusterState state, UpdateSettingsRequest request) {
return execute(transportUpdateSettingsAction, request, state);
}
public ClusterState reroute(ClusterState state, ClusterRerouteRequest request) {
return execute(transportClusterRerouteAction, request, state);
}
public ClusterState applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
RoutingAllocation.Result rerouteResult = allocationService.applyFailedShards(clusterState, failedShards);
return ClusterState.builder(clusterState).routingResult(rerouteResult).build();
}
public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards) {
RoutingAllocation.Result rerouteResult = allocationService.applyStartedShards(clusterState, startedShards);
return ClusterState.builder(clusterState).routingResult(rerouteResult).build();
}
private <Request extends MasterNodeRequest<Request>, Response extends ActionResponse> ClusterState execute(
TransportMasterNodeAction<Request, Response> masterNodeAction, Request request, ClusterState clusterState) {
return executeClusterStateUpdateTask(clusterState, () -> {
try {
TransportMasterNodeActionUtils.runMasterOperation(masterNodeAction, request, clusterState, new PlainActionFuture<>());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
private ClusterState executeClusterStateUpdateTask(ClusterState state, Runnable runnable) {
ClusterState[] result = new ClusterState[1];
doAnswer(invocationOnMock -> {
ClusterStateUpdateTask task = (ClusterStateUpdateTask)invocationOnMock.getArguments()[1];
result[0] = task.execute(state);
return null;
}).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
runnable.run();
assertThat(result[0], notNullValue());
return result[0];
}
}

View File

@ -0,0 +1,281 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation.FailedShard;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executor;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndicesClusterStateServiceTestCase {
private final ClusterStateChanges cluster = new ClusterStateChanges();
public void testRandomClusterStateUpdates() {
// we have an IndicesClusterStateService per node in the cluster
final Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap = new HashMap<>();
ClusterState state = randomInitialClusterState(clusterStateServiceMap);
// each of the following iterations represents a new cluster state update processed on all nodes
for (int i = 0; i < 30; i++) {
logger.info("Iteration {}", i);
final ClusterState previousState = state;
// calculate new cluster state
for (int j = 0; j < randomInt(3); j++) { // multiple iterations to simulate batching of cluster states
state = randomlyUpdateClusterState(state, clusterStateServiceMap);
}
// apply cluster state to nodes (incl. master)
for (DiscoveryNode node : state.nodes()) {
IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node);
ClusterState localState = adaptClusterStateToLocalNode(state, node);
ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node);
indicesClusterStateService.clusterChanged(new ClusterChangedEvent("simulated change " + i, localState, previousLocalState));
// check that cluster state has been properly applied to node
assertClusterStateMatchesNodeState(localState, indicesClusterStateService);
}
}
// TODO: check if we can go to green by starting all shards and finishing all iterations
logger.info("Final cluster state: {}", state.prettyPrint());
}
public ClusterState randomInitialClusterState(Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap) {
List<DiscoveryNode> allNodes = new ArrayList<>();
DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master
allNodes.add(localNode);
// at least two nodes that have the data role so that we can allocate shards
allNodes.add(createNode(DiscoveryNode.Role.DATA));
allNodes.add(createNode(DiscoveryNode.Role.DATA));
for (int i = 0; i < randomIntBetween(2, 5); i++) {
allNodes.add(createNode());
}
ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()]));
// add nodes to clusterStateServiceMap
updateNodes(state, clusterStateServiceMap);
return state;
}
private void updateNodes(ClusterState state, Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap) {
for (DiscoveryNode node : state.nodes()) {
clusterStateServiceMap.computeIfAbsent(node, discoveryNode -> {
IndicesClusterStateService ics = createIndicesClusterStateService();
ics.start();
return ics;
});
}
for (Iterator<Entry<DiscoveryNode, IndicesClusterStateService>> it = clusterStateServiceMap.entrySet().iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next().getKey();
if (state.nodes().nodeExists(node.getId()) == false) {
it.remove();
}
}
}
public ClusterState randomlyUpdateClusterState(ClusterState state,
Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap) {
// randomly create new indices (until we have 200 max)
for (int i = 0; i < randomInt(5); i++) {
if (state.metaData().indices().size() > 200) {
break;
}
String name = "index_" + randomAsciiOfLength(15).toLowerCase(Locale.ROOT);
CreateIndexRequest request = new CreateIndexRequest(name, Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3))
.put(SETTING_NUMBER_OF_REPLICAS, randomInt(2))
.build());
state = cluster.createIndex(state, request);
assertTrue(state.metaData().hasIndex(name));
}
// randomly delete indices
Set<String> indicesToDelete = new HashSet<>();
int numberOfIndicesToDelete = randomInt(Math.min(2, state.metaData().indices().size()));
for (String index : randomSubsetOf(numberOfIndicesToDelete, state.metaData().indices().keys().toArray(String.class))) {
indicesToDelete.add(state.metaData().index(index).getIndex().getName());
}
if (indicesToDelete.isEmpty() == false) {
DeleteIndexRequest deleteRequest = new DeleteIndexRequest(indicesToDelete.toArray(new String[indicesToDelete.size()]));
state = cluster.deleteIndices(state, deleteRequest);
for (String index : indicesToDelete) {
assertFalse(state.metaData().hasIndex(index));
}
}
// randomly close indices
int numberOfIndicesToClose = randomInt(Math.min(1, state.metaData().indices().size()));
for (String index : randomSubsetOf(numberOfIndicesToClose, state.metaData().indices().keys().toArray(String.class))) {
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(state.metaData().index(index).getIndex().getName());
state = cluster.closeIndices(state, closeIndexRequest);
}
// randomly open indices
int numberOfIndicesToOpen = randomInt(Math.min(1, state.metaData().indices().size()));
for (String index : randomSubsetOf(numberOfIndicesToOpen, state.metaData().indices().keys().toArray(String.class))) {
OpenIndexRequest openIndexRequest = new OpenIndexRequest(state.metaData().index(index).getIndex().getName());
state = cluster.openIndices(state, openIndexRequest);
}
// randomly update settings
Set<String> indicesToUpdate = new HashSet<>();
boolean containsClosedIndex = false;
int numberOfIndicesToUpdate = randomInt(Math.min(2, state.metaData().indices().size()));
for (String index : randomSubsetOf(numberOfIndicesToUpdate, state.metaData().indices().keys().toArray(String.class))) {
indicesToUpdate.add(state.metaData().index(index).getIndex().getName());
if (state.metaData().index(index).getState() == IndexMetaData.State.CLOSE) {
containsClosedIndex = true;
}
}
if (indicesToUpdate.isEmpty() == false) {
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(
indicesToUpdate.toArray(new String[indicesToUpdate.size()]));
Settings.Builder settings = Settings.builder();
if (containsClosedIndex == false) {
settings.put(SETTING_NUMBER_OF_REPLICAS, randomInt(2));
}
settings.put("index.refresh_interval", randomIntBetween(1, 5) + "s");
updateSettingsRequest.settings(settings.build());
state = cluster.updateSettings(state, updateSettingsRequest);
}
// randomly reroute
if (rarely()) {
state = cluster.reroute(state, new ClusterRerouteRequest());
}
// randomly start and fail allocated shards
List<ShardRouting> startedShards = new ArrayList<>();
List<FailedShard> failedShards = new ArrayList<>();
for (DiscoveryNode node : state.nodes()) {
IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node);
MockIndicesService indicesService = (MockIndicesService) indicesClusterStateService.indicesService;
for (MockIndexService indexService : indicesService) {
for (MockIndexShard indexShard : indexService) {
ShardRouting persistedShardRouting = indexShard.routingEntry();
if (persistedShardRouting.initializing() && randomBoolean()) {
startedShards.add(persistedShardRouting);
} else if (rarely()) {
failedShards.add(new FailedShard(persistedShardRouting, "fake shard failure", new Exception()));
}
}
}
}
state = cluster.applyFailedShards(state, failedShards);
state = cluster.applyStartedShards(state, startedShards);
// randomly add and remove nodes (except current master)
if (rarely()) {
if (randomBoolean()) {
// add node
if (state.nodes().getSize() < 10) {
DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).put(createNode()).build();
state = ClusterState.builder(state).nodes(newNodes).build();
state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave
updateNodes(state, clusterStateServiceMap);
}
} else {
// remove node
if (state.nodes().getDataNodes().size() > 3) {
DiscoveryNode discoveryNode = randomFrom(state.nodes().getNodes().values().toArray(DiscoveryNode.class));
if (discoveryNode.equals(state.nodes().getMasterNode()) == false) {
DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).remove(discoveryNode.getId()).build();
state = ClusterState.builder(state).nodes(newNodes).build();
state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node join
updateNodes(state, clusterStateServiceMap);
}
}
}
}
// TODO: go masterless?
return state;
}
protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) {
Set<DiscoveryNode.Role> roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values())));
for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) {
roles.add(mustHaveRole);
}
return new DiscoveryNode("node_" + randomAsciiOfLength(8), DummyTransportAddress.INSTANCE, Collections.emptyMap(), roles,
Version.CURRENT);
}
private static ClusterState adaptClusterStateToLocalNode(ClusterState state, DiscoveryNode node) {
return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(node.getId())).build();
}
private IndicesClusterStateService createIndicesClusterStateService() {
final ThreadPool threadPool = mock(ThreadPool.class);
final Executor executor = mock(Executor.class);
when(threadPool.generic()).thenReturn(executor);
final MockIndicesService indicesService = new MockIndicesService();
final TransportService transportService = new TransportService(Settings.EMPTY, null, threadPool, null);
final ClusterService clusterService = mock(ClusterService.class);
final RepositoriesService repositoriesService = new RepositoriesService(Settings.EMPTY, clusterService,
transportService, null, null);
final RecoveryTargetService recoveryTargetService = new RecoveryTargetService(Settings.EMPTY, threadPool,
transportService, null, clusterService);
final ShardStateAction shardStateAction = mock(ShardStateAction.class);
return new IndicesClusterStateService(Settings.EMPTY, indicesService, clusterService,
threadPool, recoveryTargetService, shardStateAction, null, repositoriesService, null, null, null, null, null);
}
}