From 48443259217e431146c8d3b13c01cdd5ecd7a637 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 6 Jun 2016 12:53:04 +0200 Subject: [PATCH] Introduced Global checkpoints for Sequence Numbers (#15485) Global checkpoints are update by the primary and represent the common part of history across shard copies, as know at a given time. The primary is also in charge of periodically broadcast this information to the replicas. See #10708 for more details. --- .../action/ActionListenerResponseHandler.java | 4 +- .../replication/ReplicationOperation.java | 40 ++- .../TransportReplicationAction.java | 88 ++++++- .../metadata/MetaDataCreateIndexService.java | 4 +- .../metadata/MetaDataIndexAliasesService.java | 3 +- .../metadata/MetaDataMappingService.java | 5 +- .../routing/IndexShardRoutingTable.java | 9 + .../common/settings/IndexScopedSettings.java | 3 + .../org/elasticsearch/index/IndexModule.java | 10 +- .../org/elasticsearch/index/IndexService.java | 56 +++- .../elasticsearch/index/IndexSettings.java | 12 + .../elasticsearch/index/engine/Engine.java | 10 +- .../index/engine/InternalEngine.java | 5 +- .../index/engine/ShadowEngine.java | 4 +- .../index/seqno/GlobalCheckpointService.java | 244 ++++++++++++++++++ .../seqno/GlobalCheckpointSyncAction.java | 132 ++++++++++ .../index/seqno/LocalCheckpointService.java | 14 +- .../elasticsearch/index/seqno/SeqNoStats.java | 20 +- .../index/seqno/SequenceNumbersService.java | 69 ++++- .../elasticsearch/index/shard/IndexShard.java | 83 +++++- .../index/shard/ShadowIndexShard.java | 21 +- .../elasticsearch/indices/IndicesModule.java | 5 +- .../elasticsearch/indices/IndicesService.java | 19 +- .../cluster/IndicesClusterStateService.java | 16 +- .../recovery/RecoverySourceHandler.java | 5 +- .../indices/recovery/RecoveryTarget.java | 6 +- .../recovery/RecoveryTargetHandler.java | 42 ++- .../recovery/RecoveryTargetService.java | 6 +- .../recovery/RemoteRecoveryTargetHandler.java | 12 +- .../ReplicationOperationTests.java | 50 +++- .../TransportReplicationActionTests.java | 12 +- .../elasticsearch/index/IndexModuleTests.java | 34 +-- .../index/IndexServiceTests.java | 14 + .../index/engine/InternalEngineTests.java | 13 +- .../index/seqno/CheckpointsIT.java | 79 ++++++ .../index/seqno/GlobalCheckpointTests.java | 143 ++++++++++ .../seqno/LocalCheckpointServiceTests.java | 12 +- .../index/shard/IndexShardTests.java | 3 +- ...dicesLifecycleListenerSingleNodeTests.java | 2 +- .../test/InternalTestCluster.java | 65 +++-- 40 files changed, 1216 insertions(+), 158 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java create mode 100644 core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java create mode 100644 core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java create mode 100644 core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java diff --git a/core/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java b/core/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java index 6cdc1c3194f..301f7ea4134 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java +++ b/core/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java @@ -33,10 +33,10 @@ import java.util.function.Supplier; */ public class ActionListenerResponseHandler extends BaseTransportResponseHandler { - private final ActionListener listener; + private final ActionListener listener; private final Supplier responseSupplier; - public ActionListenerResponseHandler(ActionListener listener, Supplier responseSupplier) { + public ActionListenerResponseHandler(ActionListener listener, Supplier responseSupplier) { this.listener = Objects.requireNonNull(listener); this.responseSupplier = Objects.requireNonNull(responseSupplier); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 1f7313c1943..5c7e8191ecc 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.transport.TransportResponse; import java.io.IOException; import java.util.ArrayList; @@ -85,7 +84,8 @@ public class ReplicationOperation, R void execute() throws Exception { final String writeConsistencyFailure = checkWriteConsistency ? checkWriteConsistency() : null; - final ShardId shardId = primary.routingEntry().shardId(); + final ShardRouting primaryRouting = primary.routingEntry(); + final ShardId shardId = primaryRouting.shardId(); if (writeConsistencyFailure != null) { finishAsFailed(new UnavailableShardsException(shardId, "{} Timeout: [{}], request: [{}]", writeConsistencyFailure, request.timeout(), request)); @@ -96,6 +96,7 @@ public class ReplicationOperation, R pendingShards.incrementAndGet(); // increase by 1 until we finish all primary coordination Tuple primaryResponse = primary.perform(request); successfulShards.incrementAndGet(); // mark primary as successful + primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint()); finalResponse = primaryResponse.v1(); ReplicaRequest replicaRequest = primaryResponse.v2(); assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term"; @@ -107,7 +108,7 @@ public class ReplicationOperation, R // to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then. // If the index gets deleted after primary operation, we skip replication List shards = getShards(shardId, clusterStateSupplier.get()); - final String localNodeId = primary.routingEntry().currentNodeId(); + final String localNodeId = primaryRouting.currentNodeId(); for (final ShardRouting shard : shards) { if (executeOnReplicas == false || shard.unassigned()) { if (shard.primary() == false) { @@ -136,10 +137,11 @@ public class ReplicationOperation, R totalShards.incrementAndGet(); pendingShards.incrementAndGet(); - replicasProxy.performOn(shard, replicaRequest, new ActionListener() { + replicasProxy.performOn(shard, replicaRequest, new ActionListener() { @Override - public void onResponse(TransportResponse.Empty empty) { + public void onResponse(ReplicaResponse response) { successfulShards.incrementAndGet(); + primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint()); decPendingAndFinishIfNeeded(); } @@ -301,18 +303,30 @@ public class ReplicationOperation, R */ Tuple perform(Request request) throws Exception; + + /** + * Notifies the primary of a local checkpoint for the given allocation. + * + * Note: The primary will use this information to advance the global checkpoint if possible. + * + * @param allocationId allocation ID of the shard corresponding to the supplied local checkpoint + * @param checkpoint the *local* checkpoint for the shard + */ + void updateLocalCheckpointForShard(String allocationId, long checkpoint); + + /** returns the local checkpoint of the primary shard */ + long localCheckpoint(); } interface Replicas> { /** * performs the the given request on the specified replica - * * @param replica {@link ShardRouting} of the shard this request should be executed on * @param replicaRequest operation to peform * @param listener a callback to call once the operation has been complicated, either successfully or with an error. */ - void performOn(ShardRouting replica, ReplicaRequest replicaRequest, ActionListener listener); + void performOn(ShardRouting replica, ReplicaRequest replicaRequest, ActionListener listener); /** * Fail the specified shard, removing it from the current set of active shards @@ -331,6 +345,18 @@ public class ReplicationOperation, R Consumer onPrimaryDemoted, Consumer onIgnoredFailure); } + /** + * An interface to encapsulate the metadata needed from replica shards when they respond to operations performed on them + */ + interface ReplicaResponse { + + /** the local check point for the shard. see {@link org.elasticsearch.index.seqno.SequenceNumbersService#getLocalCheckpoint()} */ + long localCheckpoint(); + + /** the allocation id of the replica shard */ + String allocationId(); + } + public static class RetryOnPrimaryException extends ElasticsearchException { public RetryOnPrimaryException(ShardId shardId, String msg) { this(shardId, msg, null); diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 8a721dfe508..d9df6416360 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.WriteConsistencyLevel; @@ -43,6 +44,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -425,16 +427,18 @@ public abstract class TransportReplicationAction, Releasable { + class ShardReference implements Releasable { - private final IndexShard indexShard; + protected final IndexShard indexShard; private final Releasable operationLock; - PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { + ShardReference(IndexShard indexShard, Releasable operationLock) { this.indexShard = indexShard; this.operationLock = operationLock; } @@ -737,6 +741,22 @@ public abstract class TransportReplicationAction { + + PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { + super(indexShard, operationLock); + } + public boolean isRelocated() { return indexShard.state() == IndexShardState.RELOCATED; } @@ -758,15 +778,59 @@ public abstract class TransportReplicationAction { @Override - public void performOn(ShardRouting replica, ReplicaRequest request, ActionListener listener) { + public void performOn(ShardRouting replica, ReplicaRequest request, ActionListener listener) { String nodeId = replica.currentNodeId(); final DiscoveryNode node = clusterService.state().nodes().get(nodeId); if (node == null) { @@ -774,7 +838,7 @@ public abstract class TransportReplicationAction(listener, () -> TransportResponse.Empty.INSTANCE)); + new ActionListenerResponseHandler<>(listener, ReplicaResponse::new)); } @Override diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index c8c352fc46d..efc1cbac6c1 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -65,7 +65,6 @@ import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexCreationException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; -import org.elasticsearch.script.ScriptService; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -299,7 +298,8 @@ public class MetaDataCreateIndexService extends AbstractComponent { // Set up everything, now locally create the index to see that things are ok, and apply final IndexMetaData tmpImd = IndexMetaData.builder(request.index()).settings(actualIndexSettings).build(); // create the index here (on the master) to validate it can be created, as well as adding the mapping - final IndexService indexService = indicesService.createIndex(nodeServicesProvider, tmpImd, Collections.emptyList()); + final IndexService indexService = indicesService.createIndex(nodeServicesProvider, tmpImd, + Collections.emptyList(), shardId -> {}); createdIndex = indexService.index(); // now add the mappings MapperService mapperService = indexService.mapperService(); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java index e39b86a1611..72c672b073c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java @@ -104,7 +104,8 @@ public class MetaDataIndexAliasesService extends AbstractComponent { if (indexService == null) { // temporarily create the index and add mappings so we can parse the filter try { - indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.emptyList()); + indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, + Collections.emptyList(), shardId -> {}); for (ObjectCursor cursor : indexMetaData.getMappings().values()) { MappingMetaData mappingMetaData = cursor.value; indexService.mapperService().merge(mappingMetaData.type(), mappingMetaData.source(), MapperService.MergeReason.MAPPING_RECOVERY, false); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index d3b5e7ecbad..13bc8f258f3 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -42,7 +42,6 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.NodeServicesProvider; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.percolator.PercolatorFieldMapper; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidTypeNameException; @@ -140,7 +139,7 @@ public class MetaDataMappingService extends AbstractComponent { IndexService indexService = indicesService.indexService(indexMetaData.getIndex()); if (indexService == null) { // we need to create the index here, and add the current mapping to it, so we can merge - indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.emptyList()); + indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, Collections.emptyList(), shardId -> {}); removeIndex = true; for (ObjectCursor metaData : indexMetaData.getMappings().values()) { // don't apply the default mapping, it has been applied when the mapping was created @@ -224,7 +223,7 @@ public class MetaDataMappingService extends AbstractComponent { // close it later once we are done with mapping update indicesToClose.add(indexMetaData.getIndex()); IndexService indexService = indicesService.createIndex(nodeServicesProvider, indexMetaData, - Collections.emptyList()); + Collections.emptyList(), shardId -> {}); // add mappings for all types, we need them for cross-type validation for (ObjectCursor mapping : indexMetaData.getMappings().values()) { indexService.mapperService().merge(mapping.value.type(), mapping.value.source(), diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 53b094bc34b..86c2b89383f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -183,6 +183,15 @@ public class IndexShardRoutingTable implements Iterable { return this.activeShards; } + /** + * Returns a {@link List} of all initializing shards, including target shards of relocations + * + * @return a {@link List} of shards + */ + public List getAllInitializingShards() { + return this.allInitializingShards; + } + /** * Returns a {@link List} of active shards * diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 203d1db76b3..ed4e691a70b 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.percolator.PercolatorFieldMapper; +import org.elasticsearch.index.seqno.LocalCheckpointService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.FsDirectoryService; import org.elasticsearch.index.store.IndexStore; @@ -116,6 +117,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.QUERY_STRING_LENIENT_SETTING, IndexSettings.ALLOW_UNMAPPED, IndexSettings.INDEX_CHECK_ON_STARTUP, + IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL, + LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE, ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING, IndexSettings.INDEX_GC_DELETES_SETTING, IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/IndexModule.java b/core/src/main/java/org/elasticsearch/index/IndexModule.java index 6ceabd1146d..2f8a8fe5a2a 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/core/src/main/java/org/elasticsearch/index/IndexModule.java @@ -26,14 +26,15 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.analysis.AnalysisRegistry; -import org.elasticsearch.index.cache.query.QueryCache; -import org.elasticsearch.index.cache.query.IndexQueryCache; import org.elasticsearch.index.cache.query.DisabledQueryCache; +import org.elasticsearch.index.cache.query.IndexQueryCache; +import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.IndexingOperationListener; import org.elasticsearch.index.shard.SearchOperationListener; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.similarity.BM25SimilarityProvider; import org.elasticsearch.index.similarity.SimilarityProvider; import org.elasticsearch.index.similarity.SimilarityService; @@ -291,7 +292,8 @@ public final class IndexModule { public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, IndicesQueryCache indicesQueryCache, - MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache) throws IOException { + MapperRegistry mapperRegistry, Consumer globalCheckpointSyncer, + IndicesFieldDataCache indicesFieldDataCache) throws IOException { final IndexEventListener eventListener = freeze(); IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null ? (shard) -> null : indexSearcherWrapper.get(); @@ -326,7 +328,7 @@ public final class IndexModule { } return new IndexService(indexSettings, environment, new SimilarityService(indexSettings, similarities), shardStoreDeleter, analysisRegistry, engineFactory.get(), servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, - mapperRegistry, indicesFieldDataCache, searchOperationListeners, indexOperationListeners); + mapperRegistry, indicesFieldDataCache, globalCheckpointSyncer, searchOperationListeners, indexOperationListeners); } /** diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index f5e5ce91d80..1ef66ce5f2d 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -84,6 +84,7 @@ import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; @@ -108,6 +109,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC private final SimilarityService similarityService; private final EngineFactory engineFactory; private final IndexWarmer warmer; + private final Consumer globalCheckpointSyncer; private volatile Map shards = emptyMap(); private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean deleted = new AtomicBoolean(false); @@ -118,6 +120,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC private volatile AsyncTranslogFSync fsyncTask; private final ThreadPool threadPool; private final BigArrays bigArrays; + private final AsyncGlobalCheckpointTask globalCheckpointTask; public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv, SimilarityService similarityService, @@ -131,10 +134,12 @@ public final class IndexService extends AbstractIndexComponent implements IndexC IndexModule.IndexSearcherWrapperFactory wrapperFactory, MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache, + Consumer globalCheckpointSyncer, List searchOperationListeners, List indexingOperationListeners) throws IOException { super(indexSettings); this.indexSettings = indexSettings; + this.globalCheckpointSyncer = globalCheckpointSyncer; this.analysisService = registry.build(indexSettings); this.similarityService = similarityService; this.mapperService = new MapperService(indexSettings, analysisService, similarityService, mapperRegistry, @@ -156,6 +161,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC this.engineFactory = engineFactory; // initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE this.searcherWrapper = wrapperFactory.newWrapper(this); + this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners); this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners); // kick off async ops for the first shard in this index @@ -236,7 +242,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } } finally { - IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, analysisService, refreshTask, fsyncTask); + IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, analysisService, refreshTask, fsyncTask, globalCheckpointTask); } } } @@ -339,6 +345,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } else { indexShard = new IndexShard(routing, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer, + () -> globalCheckpointSyncer.accept(shardId), searchOperationListeners, indexingOperationListeners); } eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); @@ -718,6 +725,31 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } + private void maybeUpdateGlobalCheckpoints() { + for (IndexShard shard : this.shards.values()) { + if (shard.routingEntry().primary()) { + switch (shard.state()) { + case CREATED: + case RECOVERING: + case CLOSED: + case RELOCATED: + continue; + case POST_RECOVERY: + case STARTED: + try { + shard.updateGlobalCheckpointOnPrimary(); + } catch (EngineClosedException | AlreadyClosedException ex) { + // fine - continue, the shard was concurrently closed on us. + } + continue; + default: + throw new IllegalStateException("unknown state: " + shard.state()); + } + } + } + } + + static abstract class BaseAsyncTask implements Runnable, Closeable { protected final IndexService indexService; protected final ThreadPool threadPool; @@ -825,6 +857,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC protected String getThreadPool() { return ThreadPool.Names.FLUSH; } + @Override protected void runInternal() { indexService.maybeFSyncTranslogs(); @@ -858,6 +891,23 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } + final class AsyncGlobalCheckpointTask extends BaseAsyncTask { + + AsyncGlobalCheckpointTask(IndexService indexService) { + super(indexService, indexService.getIndexSettings().getGlobalCheckpointInterval()); + } + + @Override + protected void runInternal() { + indexService.maybeUpdateGlobalCheckpoints(); + } + + @Override + public String toString() { + return "global_checkpoint"; + } + } + AsyncRefreshTask getRefreshTask() { // for tests return refreshTask; } @@ -865,4 +915,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC AsyncTranslogFSync getFsyncTask() { // for tests return fsyncTask; } + + AsyncGlobalCheckpointTask getGlobalCheckpointTask() { // for tests + return globalCheckpointTask; + } } diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 7c8cb4ff8c8..f378f0951d7 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -106,6 +106,9 @@ public final class IndexSettings { Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); + public static final Setting INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL = + Setting.timeSetting("index.seq_no.checkpoint_sync_interval", new TimeValue(30, TimeUnit.SECONDS), + new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope); /** * Index setting to enable / disable deletes garbage collection. @@ -136,6 +139,7 @@ public final class IndexSettings { private volatile Translog.Durability durability; private final TimeValue syncInterval; private volatile TimeValue refreshInterval; + private final TimeValue globalCheckpointInterval; private volatile ByteSizeValue flushThresholdSize; private final MergeSchedulerConfig mergeSchedulerConfig; private final MergePolicyConfig mergePolicyConfig; @@ -222,6 +226,7 @@ public final class IndexSettings { this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING); syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings); refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); + globalCheckpointInterval = scopedSettings.get(INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); @@ -437,6 +442,13 @@ public final class IndexSettings { return refreshInterval; } + /** + * Returns this interval in which the primary shards of this index should check and advance the global checkpoint + */ + public TimeValue getGlobalCheckpointInterval() { + return globalCheckpointInterval; + } + /** * Returns the transaction log threshold size when to forcefully flush the index and clear the transaction log. */ diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 8c89ec4e161..d8927bea290 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -28,15 +28,12 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentCommitInfo; -import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; import org.apache.lucene.search.SearcherManager; -import org.apache.lucene.search.join.BitSetProducer; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; @@ -64,10 +61,8 @@ import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; -import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -76,7 +71,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.Arrays; -import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -393,8 +387,8 @@ public abstract class Engine implements Closeable { return new CommitStats(getLastCommittedSegmentInfos()); } - /** get sequence number related stats */ - public abstract SeqNoStats seqNoStats(); + /** get the sequence number service */ + public abstract SequenceNumbersService seqNoService(); /** * Read the last segments info from the commit pointed to by the searcher manager diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 78c832c846e..bb1b59ad9a8 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -56,7 +56,6 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; -import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; import org.elasticsearch.index.shard.ShardId; @@ -1149,7 +1148,7 @@ public class InternalEngine extends Engine { } @Override - public SeqNoStats seqNoStats() { - return seqNoService.stats(); + public SequenceNumbersService seqNoService() { + return seqNoService; } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index 0d875763567..6a5f30cbe1e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -30,7 +30,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; -import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.translog.Translog; import java.io.IOException; @@ -250,7 +250,7 @@ public class ShadowEngine extends Engine { } @Override - public SeqNoStats seqNoStats() { + public SequenceNumbersService seqNoService() { throw new UnsupportedOperationException("ShadowEngine doesn't track sequence numbers"); } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java new file mode 100644 index 00000000000..1bc34089238 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java @@ -0,0 +1,244 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import com.carrotsearch.hppc.ObjectLongHashMap; +import com.carrotsearch.hppc.ObjectLongMap; +import com.carrotsearch.hppc.cursors.ObjectLongCursor; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.ShardId; + +import java.util.Set; + +/** + * A shard component that is responsible of tracking the global checkpoint. The global checkpoint + * is the highest seq_no for which all lower (or equal) seq_no have been processed on all shards that + * are currently active. Since shards count as "active" when the master starts them, and before this primary shard + * has been notified of this fact, we also include shards in that are in the + * {@link org.elasticsearch.index.shard.IndexShardState#POST_RECOVERY} state when checking for global checkpoint advancement. + * We call these shards "in sync" with all operations on the primary (see {@link #inSyncLocalCheckpoints}. + * + *

+ * The global checkpoint is maintained by the primary shard and is replicated to all the replicas + * (via {@link GlobalCheckpointSyncAction}). + */ +public class GlobalCheckpointService extends AbstractIndexShardComponent { + + /** + * This map holds the last known local checkpoint for every shard copy that's active. + * All shard copies in this map participate in determining the global checkpoint + * keyed by allocation ids + */ + final private ObjectLongMap activeLocalCheckpoints; + + /** + * This map holds the last known local checkpoint for every initializing shard copy that's has been brought up + * to speed through recovery. These shards are treated as valid copies and participate in determining the global + * checkpoint. + *

+ * Keyed by allocation ids. + */ + final private ObjectLongMap inSyncLocalCheckpoints; // keyed by allocation ids + + + /** + * This map holds the last known local checkpoint for every initializing shard copy that is still undergoing recovery. + * These shards do not participate in determining the global checkpoint. This map is needed to make sure that when + * shards are promoted to {@link #inSyncLocalCheckpoints} we use the highest known checkpoint, even if we index concurrently + * while recovering the shard. + * Keyed by allocation ids + */ + final private ObjectLongMap trackingLocalCheckpoint; + + private long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + + public GlobalCheckpointService(ShardId shardId, IndexSettings indexSettings) { + super(shardId, indexSettings); + activeLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas()); + inSyncLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas()); + trackingLocalCheckpoint = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas()); + } + + + /** + * notifies the service of a local checkpoint. if the checkpoint is lower than the currently known one, + * this is a noop. Last, if the allocation id is not yet known, it is ignored. This to prevent late + * arrivals from shards that are removed to be re-added. + */ + synchronized public void updateLocalCheckpoint(String allocationId, long localCheckpoint) { + if (updateLocalCheckpointInMap(allocationId, localCheckpoint, activeLocalCheckpoints, "active")) { + return; + } + if (updateLocalCheckpointInMap(allocationId, localCheckpoint, inSyncLocalCheckpoints, "inSync")) { + return; + } + if (updateLocalCheckpointInMap(allocationId, localCheckpoint, trackingLocalCheckpoint, "tracking")) { + return; + } + logger.trace("local checkpoint of [{}] ([{}]) wasn't found in any map. ignoring.", allocationId, localCheckpoint); + } + + private boolean updateLocalCheckpointInMap(String allocationId, long localCheckpoint, + ObjectLongMap checkpointsMap, String name) { + assert Thread.holdsLock(this); + int indexOfKey = checkpointsMap.indexOf(allocationId); + if (indexOfKey < 0) { + return false; + } + long current = checkpointsMap.indexGet(indexOfKey); + // nocommit: this can change when we introduces rollback/resync + if (current < localCheckpoint) { + checkpointsMap.indexReplace(indexOfKey, localCheckpoint); + if (logger.isTraceEnabled()) { + logger.trace("updated local checkpoint of [{}] to [{}] (type [{}])", allocationId, localCheckpoint, + name); + } + } else { + logger.trace("skipping update local checkpoint [{}], current check point is higher " + + "(current [{}], incoming [{}], type [{}])", + allocationId, current, localCheckpoint, allocationId); + } + return true; + } + + /** + * Scans through the currently known local checkpoints and updates the global checkpoint accordingly. + * + * @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints + * of one of the active allocations is not known. + */ + synchronized public boolean updateCheckpointOnPrimary() { + long minCheckpoint = Long.MAX_VALUE; + if (activeLocalCheckpoints.isEmpty() && inSyncLocalCheckpoints.isEmpty()) { + return false; + } + for (ObjectLongCursor cp : activeLocalCheckpoints) { + if (cp.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + logger.trace("unknown local checkpoint for active allocationId [{}], requesting a sync", cp.key); + return true; + } + minCheckpoint = Math.min(cp.value, minCheckpoint); + } + for (ObjectLongCursor cp : inSyncLocalCheckpoints) { + assert cp.value != SequenceNumbersService.UNASSIGNED_SEQ_NO : + "in sync allocation ids can not have an unknown checkpoint (aId [" + cp.key + "])"; + minCheckpoint = Math.min(cp.value, minCheckpoint); + } + if (minCheckpoint < globalCheckpoint) { + // nocommit: if this happens - do you we fail the shard? + throw new IllegalStateException(shardId + " new global checkpoint [" + minCheckpoint + + "] is lower than previous one [" + globalCheckpoint + "]"); + } + if (globalCheckpoint != minCheckpoint) { + logger.trace("global checkpoint updated to [{}]", minCheckpoint); + globalCheckpoint = minCheckpoint; + return true; + } + return false; + } + + /** + * gets the current global checkpoint. See java docs for {@link GlobalCheckpointService} for more details + */ + synchronized public long getCheckpoint() { + return globalCheckpoint; + } + + /** + * updates the global checkpoint on a replica shard (after it has been updated by the primary). + */ + synchronized public void updateCheckpointOnReplica(long globalCheckpoint) { + if (this.globalCheckpoint <= globalCheckpoint) { + this.globalCheckpoint = globalCheckpoint; + logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint); + } else { + // nocommit: fail the shard? + throw new IllegalArgumentException("global checkpoint from primary should never decrease. current [" + + this.globalCheckpoint + "], got [" + globalCheckpoint + "]"); + + } + } + + /** + * Notifies the service of the current allocation ids in the cluster state. This method trims any shards that + * have been removed and adds/promotes any active allocations to the {@link #activeLocalCheckpoints}. + * + * @param activeAllocationIds the allocation ids of the currently active shard copies + * @param initializingAllocationIds the allocation ids of the currently initializing shard copies + */ + synchronized public void updateAllocationIdsFromMaster(Set activeAllocationIds, + Set initializingAllocationIds) { + activeLocalCheckpoints.removeAll(key -> activeAllocationIds.contains(key) == false); + for (String activeId : activeAllocationIds) { + if (activeLocalCheckpoints.containsKey(activeId) == false) { + long knownCheckpoint = trackingLocalCheckpoint.getOrDefault(activeId, SequenceNumbersService.UNASSIGNED_SEQ_NO); + knownCheckpoint = inSyncLocalCheckpoints.getOrDefault(activeId, knownCheckpoint); + activeLocalCheckpoints.put(activeId, knownCheckpoint); + logger.trace("marking [{}] as active. known checkpoint [{}]", activeId, knownCheckpoint); + } + } + inSyncLocalCheckpoints.removeAll(key -> initializingAllocationIds.contains(key) == false); + trackingLocalCheckpoint.removeAll(key -> initializingAllocationIds.contains(key) == false); + // add initializing shards to tracking + for (String initID : initializingAllocationIds) { + if (inSyncLocalCheckpoints.containsKey(initID)) { + continue; + } + if (trackingLocalCheckpoint.containsKey(initID)) { + continue; + } + trackingLocalCheckpoint.put(initID, SequenceNumbersService.UNASSIGNED_SEQ_NO); + logger.trace("added [{}] to the tracking map due to a CS update", initID); + + } + } + + /** + * marks the allocationId as "in sync" with the primary shard. This should be called at the end of recovery + * where the primary knows all operation bellow the global checkpoint have been completed on this shard. + * + * @param allocationId allocationId of the recovering shard + * @param localCheckpoint the local checkpoint of the shard in question + */ + synchronized public void markAllocationIdAsInSync(String allocationId, long localCheckpoint) { + if (trackingLocalCheckpoint.containsKey(allocationId) == false) { + // master have change its mind and removed this allocation, ignore. + return; + } + long current = trackingLocalCheckpoint.remove(allocationId); + localCheckpoint = Math.max(current, localCheckpoint); + logger.trace("marked [{}] as in sync with a local checkpoint of [{}]", allocationId, localCheckpoint); + inSyncLocalCheckpoints.put(allocationId, localCheckpoint); + } + + // for testing + synchronized long getLocalCheckpointForAllocation(String allocationId) { + if (activeLocalCheckpoints.containsKey(allocationId)) { + return activeLocalCheckpoints.get(allocationId); + } + if (inSyncLocalCheckpoints.containsKey(allocationId)) { + return inSyncLocalCheckpoints.get(allocationId); + } + if (trackingLocalCheckpoint.containsKey(allocationId)) { + return trackingLocalCheckpoint.get(allocationId); + } + return SequenceNumbersService.UNASSIGNED_SEQ_NO; + } +} diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java new file mode 100644 index 00000000000..06f4f3bad16 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ReplicationResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationRequest; +import org.elasticsearch.action.support.replication.TransportReplicationAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +public class GlobalCheckpointSyncAction extends TransportReplicationAction { + + public static String ACTION_NAME = "indices:admin/seq_no/global_checkpoint_sync"; + + @Inject + public GlobalCheckpointSyncAction(Settings settings, TransportService transportService, + ClusterService clusterService, IndicesService indicesService, + ThreadPool threadPool, ShardStateAction shardStateAction, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, + actionFilters, indexNameExpressionResolver, PrimaryRequest::new, ReplicaRequest::new, + ThreadPool.Names.SAME); + } + + @Override + protected ReplicationResponse newResponseInstance() { + return new ReplicationResponse(); + } + + @Override + protected Tuple shardOperationOnPrimary(PrimaryRequest request) { + IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + IndexShard indexShard = indexService.getShard(request.shardId().id()); + long checkpoint = indexShard.getGlobalCheckpoint(); + return new Tuple<>(new ReplicationResponse(), new ReplicaRequest(request, checkpoint)); + } + + @Override + protected void shardOperationOnReplica(ReplicaRequest request) { + IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + IndexShard indexShard = indexService.getShard(request.shardId().id()); + indexShard.updateGlobalCheckpointOnReplica(request.checkpoint); + } + + public void updateCheckpointForShard(ShardId shardId) { + execute(new PrimaryRequest(shardId), new ActionListener() { + @Override + public void onResponse(ReplicationResponse replicationResponse) { + if (logger.isTraceEnabled()) { + logger.trace("{} global checkpoint successfully updated (shard info [{}])", shardId, + replicationResponse.getShardInfo()); + } + } + + @Override + public void onFailure(Throwable e) { + logger.debug("{} failed to update global checkpoint", e, shardId); + } + }); + } + + final static class PrimaryRequest extends ReplicationRequest { + + private PrimaryRequest() { + super(); + } + + PrimaryRequest(ShardId shardId) { + super(shardId); + } + } + + final static class ReplicaRequest extends ReplicationRequest { + + public long checkpoint; + + private ReplicaRequest() { + } + + ReplicaRequest(PrimaryRequest primaryRequest, long checkpoint) { + super(primaryRequest.shardId()); + this.checkpoint = checkpoint; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + checkpoint = in.readZLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeZLong(checkpoint); + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index 8b5cb4a4616..d20432e5071 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.seqno; import org.apache.lucene.util.FixedBitSet; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; @@ -35,10 +36,8 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { * we keep a bit for each seq No that is still pending. to optimize allocation, we do so in multiple arrays * allocating them on demand and cleaning up while completed. This setting controls the size of the arrays */ - public static String SETTINGS_BIT_ARRAYS_SIZE = "index.seq_no.checkpoint.bit_arrays_size"; - - /** default value for {@link #SETTINGS_BIT_ARRAYS_SIZE} */ - final static int DEFAULT_BIT_ARRAYS_SIZE = 1024; + public static Setting SETTINGS_BIT_ARRAYS_SIZE = Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024, + 4, Setting.Property.IndexScope); /** @@ -50,7 +49,7 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { long firstProcessedSeqNo = 0; /** the current local checkpoint, i.e., all seqNo lower (<=) than this number have been completed */ - volatile long checkpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + volatile long checkpoint = SequenceNumbersService.NO_OPS_PERFORMED; /** the next available seqNo - used for seqNo generation */ volatile long nextSeqNo = 0; @@ -58,10 +57,7 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) { super(shardId, indexSettings); - bitArraysSize = indexSettings.getSettings().getAsInt(SETTINGS_BIT_ARRAYS_SIZE, DEFAULT_BIT_ARRAYS_SIZE); - if (bitArraysSize <= 0) { - throw new IllegalArgumentException("[" + SETTINGS_BIT_ARRAYS_SIZE + "] must be positive. got [" + bitArraysSize + "]"); - } + bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings()); processedSeqNo = new LinkedList<>(); } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java index da9d0023cae..48c02058b0b 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java @@ -31,19 +31,22 @@ public class SeqNoStats implements ToXContent, Writeable { private static final String SEQ_NO = "seq_no"; private static final String MAX_SEQ_NO = "max"; private static final String LOCAL_CHECKPOINT = "local_checkpoint"; + private static final String GLOBAL_CHECKPOINT = "global_checkpoint"; - public static final SeqNoStats PROTO = new SeqNoStats(0,0); + public static final SeqNoStats PROTO = new SeqNoStats(0, 0, 0); - final long maxSeqNo; - final long localCheckpoint; + private final long maxSeqNo; + private final long localCheckpoint; + private final long globalCheckpoint; - public SeqNoStats(long maxSeqNo, long localCheckpoint) { + public SeqNoStats(long maxSeqNo, long localCheckpoint, long globalCheckpoint) { this.maxSeqNo = maxSeqNo; this.localCheckpoint = localCheckpoint; + this.globalCheckpoint = globalCheckpoint; } public SeqNoStats(StreamInput in) throws IOException { - this(in.readZLong(), in.readZLong()); + this(in.readZLong(), in.readZLong(), in.readZLong()); } /** the maximum sequence number seen so far */ @@ -56,10 +59,15 @@ public class SeqNoStats implements ToXContent, Writeable { return localCheckpoint; } + public long getGlobalCheckpoint() { + return globalCheckpoint; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeZLong(maxSeqNo); out.writeZLong(localCheckpoint); + out.writeZLong(globalCheckpoint); } @Override @@ -67,8 +75,8 @@ public class SeqNoStats implements ToXContent, Writeable { builder.startObject(SEQ_NO); builder.field(MAX_SEQ_NO, maxSeqNo); builder.field(LOCAL_CHECKPOINT, localCheckpoint); + builder.field(GLOBAL_CHECKPOINT, globalCheckpoint); builder.endObject(); return builder; } - } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 3ef8607c4c2..fe851726570 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -22,15 +22,22 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; -/** a very light weight implementation. will be replaced with proper machinery later */ +import java.util.Set; + +/** + * a very light weight implementation. will be replaced with proper machinery later + */ public class SequenceNumbersService extends AbstractIndexShardComponent { - public final static long UNASSIGNED_SEQ_NO = -1L; + public final static long UNASSIGNED_SEQ_NO = -2L; + public final static long NO_OPS_PERFORMED = -1L; final LocalCheckpointService localCheckpointService; + final GlobalCheckpointService globalCheckpointService; public SequenceNumbersService(ShardId shardId, IndexSettings indexSettings) { super(shardId, indexSettings); localCheckpointService = new LocalCheckpointService(shardId, indexSettings); + globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings); } /** @@ -54,6 +61,62 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { * Gets sequence number related stats */ public SeqNoStats stats() { - return new SeqNoStats(localCheckpointService.getMaxSeqNo(), localCheckpointService.getCheckpoint()); + return new SeqNoStats(localCheckpointService.getMaxSeqNo(), localCheckpointService.getCheckpoint(), + globalCheckpointService.getCheckpoint()); + } + + /** + * notifies the service of a local checkpoint. + * see {@link GlobalCheckpointService#updateLocalCheckpoint(String, long)} for details. + */ + public void updateLocalCheckpointForShard(String allocationId, long checkpoint) { + globalCheckpointService.updateLocalCheckpoint(allocationId, checkpoint); + } + + /** + * marks the allocationId as "in sync" with the primary shard. + * see {@link GlobalCheckpointService#markAllocationIdAsInSync(String, long)} for details. + * + * @param allocationId allocationId of the recovering shard + * @param localCheckpoint the local checkpoint of the shard in question + */ + public void markAllocationIdAsInSync(String allocationId, long localCheckpoint) { + globalCheckpointService.markAllocationIdAsInSync(allocationId, localCheckpoint); + } + + public long getLocalCheckpoint() { + return localCheckpointService.getCheckpoint(); + } + + public long getGlobalCheckpoint() { + return globalCheckpointService.getCheckpoint(); + } + + /** + * updates the global checkpoint on a replica shard (after it has been updated by the primary). + */ + public void updateGlobalCheckpointOnReplica(long checkpoint) { + globalCheckpointService.updateCheckpointOnReplica(checkpoint); + } + + /** + * Notifies the service of the current allocation ids in the cluster state. + * see {@link GlobalCheckpointService#updateAllocationIdsFromMaster(Set, Set)} for details. + * + * @param activeAllocationIds the allocation ids of the currently active shard copies + * @param initializingAllocationIds the allocation ids of the currently initializing shard copies + */ + public void updateAllocationIdsFromMaster(Set activeAllocationIds, Set initializingAllocationIds) { + globalCheckpointService.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds); + } + + /** + * Scans through the currently known local checkpoint and updates the global checkpoint accordingly. + * + * @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints + * of one of the active allocations is not known. + */ + public boolean updateGlobalCheckpointOnPrimary() { + return globalCheckpointService.updateCheckpointOnPrimary(); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e98ce394ffe..2596579f1b9 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -90,6 +90,7 @@ import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; +import org.elasticsearch.index.seqno.GlobalCheckpointService; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.similarity.SimilarityService; @@ -122,6 +123,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -193,6 +195,9 @@ public class IndexShard extends AbstractIndexShardComponent { private static final EnumSet writeAllowedStatesForReplica = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); private final IndexSearcherWrapper searcherWrapper; + + private final Runnable globalCheckpointSyncer; + /** * True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link * IndexingMemoryController}). @@ -203,7 +208,8 @@ public class IndexShard extends AbstractIndexShardComponent { MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory, IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, ThreadPool threadPool, BigArrays bigArrays, - Engine.Warmer warmer, List searchOperationListener, List listeners) throws IOException { + Engine.Warmer warmer, Runnable globalCheckpointSyncer, + List searchOperationListener, List listeners) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); this.shardRouting = shardRouting; @@ -226,6 +232,7 @@ public class IndexShard extends AbstractIndexShardComponent { final List searchListenersList = new ArrayList<>(searchOperationListener); searchListenersList.add(searchStats); this.searchOperationListener = new SearchOperationListener.CompositeListener(searchListenersList, logger); + this.globalCheckpointSyncer = globalCheckpointSyncer; this.getService = new ShardGetService(indexSettings, this, mapperService); this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings); this.shardQueryCache = new ShardRequestCache(); @@ -531,9 +538,7 @@ public class IndexShard extends AbstractIndexShardComponent { } public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long version, VersionType versionType) { - if (shardRouting.primary() && shardRouting.isRelocationTarget() == false) { - throw new IllegalIndexShardStateException(shardId, state, "shard is not a replica"); - } + verifyReplicationTarget(); final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType(); final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null); @@ -641,7 +646,7 @@ public class IndexShard extends AbstractIndexShardComponent { @Nullable public SeqNoStats seqNoStats() { Engine engine = getEngineOrNull(); - return engine == null ? null : engine.seqNoStats(); + return engine == null ? null : engine.seqNoService().stats(); } public IndexingStats indexingStats(String... types) { @@ -1253,6 +1258,69 @@ public class IndexShard extends AbstractIndexShardComponent { } } + /** + * notifies the service of a local checkpoint. see {@link GlobalCheckpointService#updateLocalCheckpoint(String, long)} for details. + */ + public void updateLocalCheckpointForShard(String allocationId, long checkpoint) { + verifyPrimary(); + getEngine().seqNoService().updateLocalCheckpointForShard(allocationId, checkpoint); + } + + /** + * marks the allocationId as "in sync" with the primary shard. see {@link GlobalCheckpointService#markAllocationIdAsInSync(String, long)} for details. + * + * @param allocationId allocationId of the recovering shard + * @param localCheckpoint the local checkpoint of the shard in question + */ + public void markAllocationIdAsInSync(String allocationId, long localCheckpoint) { + verifyPrimary(); + getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint); + } + + public long getLocalCheckpoint() { + return getEngine().seqNoService().getLocalCheckpoint(); + } + + public long getGlobalCheckpoint() { + return getEngine().seqNoService().getGlobalCheckpoint(); + } + + /** + * checks whether the global checkpoint can be updated based on current knowledge of local checkpoints on the different + * shard copies. The checkpoint is updated or more information is required from the replica, a globack checkpoint sync + * is initiated. + */ + public void updateGlobalCheckpointOnPrimary() { + verifyPrimary(); + if (getEngine().seqNoService().updateGlobalCheckpointOnPrimary()) { + globalCheckpointSyncer.run(); + } + } + + /** + * updates the global checkpoint on a replica shard (after it has been updated by the primary). + */ + public void updateGlobalCheckpointOnReplica(long checkpoint) { + verifyReplicationTarget(); + getEngine().seqNoService().updateGlobalCheckpointOnReplica(checkpoint); + } + + /** + * Notifies the service of the current allocation ids in the cluster state. + * see {@link GlobalCheckpointService#updateAllocationIdsFromMaster(Set, Set)} for details. + * + * @param activeAllocationIds the allocation ids of the currently active shard copies + * @param initializingAllocationIds the allocation ids of the currently initializing shard copies + */ + public void updateAllocationIdsFromMaster(Set activeAllocationIds, Set initializingAllocationIds) { + verifyPrimary(); + Engine engine = getEngineOrNull(); + // if engine is not yet started, we are not ready yet and can just ignore this + if (engine != null) { + engine.seqNoService().updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds); + } + } + /** * Should be called for each no-op update operation to increment relevant statistics. * @@ -1644,4 +1712,9 @@ public class IndexShard extends AbstractIndexShardComponent { } } + // for tests + Runnable getGlobalCheckpointSyncer() { + return globalCheckpointSyncer; + } + } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 8a45a3a9183..afa670da0f7 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -53,7 +53,8 @@ public final class ShadowIndexShard extends IndexShard { ThreadPool threadPool, BigArrays bigArrays, Engine.Warmer engineWarmer, List searchOperationListeners) throws IOException { super(shardRouting, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, - indexEventListener, wrapper, threadPool, bigArrays, engineWarmer, searchOperationListeners, Collections.emptyList()); + indexEventListener, wrapper, threadPool, bigArrays, engineWarmer, () -> { + }, searchOperationListeners, Collections.emptyList()); } /** @@ -102,4 +103,22 @@ public final class ShadowIndexShard extends IndexShard { public TranslogStats translogStats() { return null; // shadow engine has no translog } + + + @Override + public void updateGlobalCheckpointOnReplica(long checkpoint) { + // nocommit: think shadow replicas through + } + + @Override + public long getLocalCheckpoint() { + // nocommit: think shadow replicas through + return -1; + } + + @Override + public long getGlobalCheckpoint() { + // nocommit: think shadow replicas through + return -1; + } } diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java index 818c84b47ee..0040378fb00 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -31,11 +31,10 @@ import org.elasticsearch.index.mapper.core.BooleanFieldMapper; import org.elasticsearch.index.mapper.core.CompletionFieldMapper; import org.elasticsearch.index.mapper.core.DateFieldMapper; import org.elasticsearch.index.mapper.core.KeywordFieldMapper; +import org.elasticsearch.index.mapper.core.NumberFieldMapper; import org.elasticsearch.index.mapper.core.StringFieldMapper; import org.elasticsearch.index.mapper.core.TextFieldMapper; import org.elasticsearch.index.mapper.core.TokenCountFieldMapper; -import org.elasticsearch.index.mapper.core.LegacyTokenCountFieldMapper; -import org.elasticsearch.index.mapper.core.NumberFieldMapper; import org.elasticsearch.index.mapper.geo.GeoPointFieldMapper; import org.elasticsearch.index.mapper.geo.GeoShapeFieldMapper; import org.elasticsearch.index.mapper.internal.AllFieldMapper; @@ -54,6 +53,7 @@ import org.elasticsearch.index.mapper.internal.VersionFieldMapper; import org.elasticsearch.index.mapper.ip.IpFieldMapper; import org.elasticsearch.index.mapper.object.ObjectMapper; import org.elasticsearch.index.percolator.PercolatorFieldMapper; +import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.flush.SyncedFlushService; import org.elasticsearch.indices.mapper.MapperRegistry; @@ -162,6 +162,7 @@ public class IndicesModule extends AbstractModule { bind(UpdateHelper.class).asEagerSingleton(); bind(MetaDataIndexUpgradeService.class).asEagerSingleton(); bind(NodeServicesProvider.class).asEagerSingleton(); + bind(GlobalCheckpointSyncAction.class).asEagerSingleton(); } // public for testing diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index 4e2b2ca170f..e8ab2d5e26b 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -112,6 +112,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -348,8 +349,7 @@ public class IndicesService extends AbstractLifecycleComponent i * @param builtInListeners a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with the per-index listeners * @throws IndexAlreadyExistsException if the index already exists. */ - public synchronized IndexService createIndex(final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, List builtInListeners) throws IOException { - + public synchronized IndexService createIndex(final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, List builtInListeners, Consumer globalCheckpointSyncer) throws IOException { if (!lifecycle.started()) { throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed"); } @@ -369,7 +369,8 @@ public class IndicesService extends AbstractLifecycleComponent i }; finalListeners.add(onStoreClose); finalListeners.add(oldShardsStats); - final IndexService indexService = createIndexService("create index", nodeServicesProvider, indexMetaData, indicesQueryCache, indicesFieldDataCache, finalListeners, indexingMemoryController); + final IndexService indexService = createIndexService("create index", nodeServicesProvider, indexMetaData, indicesQueryCache, + indicesFieldDataCache, finalListeners, globalCheckpointSyncer, indexingMemoryController); boolean success = false; try { indexService.getIndexEventListener().afterIndexCreated(indexService); @@ -386,7 +387,12 @@ public class IndicesService extends AbstractLifecycleComponent i /** * This creates a new IndexService without registering it */ - private synchronized IndexService createIndexService(final String reason, final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache, IndicesFieldDataCache indicesFieldDataCache, List builtInListeners, IndexingOperationListener... indexingOperationListeners) throws IOException { + private synchronized IndexService createIndexService(final String reason, final NodeServicesProvider nodeServicesProvider, + IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache, + IndicesFieldDataCache indicesFieldDataCache, + List builtInListeners, + Consumer globalCheckpointSyncer, + IndexingOperationListener... indexingOperationListeners) throws IOException { final Index index = indexMetaData.getIndex(); final Predicate indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state()); final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting); @@ -404,7 +410,8 @@ public class IndicesService extends AbstractLifecycleComponent i for (IndexEventListener listener : builtInListeners) { indexModule.addIndexEventListener(listener); } - return indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache); + return indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, globalCheckpointSyncer, + indicesFieldDataCache); } /** @@ -420,7 +427,7 @@ public class IndicesService extends AbstractLifecycleComponent i closeables.add(indicesQueryCache); // this will also fail if some plugin fails etc. which is nice since we can verify that early final IndexService service = createIndexService("metadata verification", nodeServicesProvider, - metaData, indicesQueryCache, indicesFieldDataCache, Collections.emptyList()); + metaData, indicesQueryCache, indicesFieldDataCache, Collections.emptyList(), s -> {}); for (ObjectCursor typeMapping : metaData.getMappings().values()) { // don't apply the default mapping, it has been applied when the mapping was created service.mapperService().merge(typeMapping.value.type(), typeMapping.value.source(), diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 6f9b1684b9e..3cc9543009f 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.IndexShardAlreadyExistsException; import org.elasticsearch.index.NodeServicesProvider; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; @@ -75,6 +76,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; /** * @@ -89,6 +91,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent activeIds = shardRoutingTable.activeShards().stream().map(sr -> sr.allocationId().getId()).collect(Collectors.toSet()); + Set initializingIds = shardRoutingTable.getAllInitializingShards().stream().map(sr -> sr.allocationId().getId()).collect(Collectors.toSet()); + indexShard.updateAllocationIdsFromMaster(activeIds, initializingIds); + } } catch (Throwable e) { failAndRemoveShard(shardRouting, indexService, true, "failed updating shard routing entry", e); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index b609eb5d08a..0be793e265e 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -362,7 +362,10 @@ public class RecoverySourceHandler { logger.trace("[{}][{}] finalizing recovery to {}", indexName, shardId, request.targetNode()); - cancellableThreads.execute(recoveryTarget::finalizeRecovery); + cancellableThreads.execute(() -> { + RecoveryTarget.FinalizeResponse response = recoveryTarget.finalizeRecovery(); + shard.markAllocationIdAsInSync(response.getAllocationId(), response.getLocalCheckpoint()); + }); if (isPrimaryRelocation()) { /** diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 29b7df0a859..82c217fc92b 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -298,8 +298,10 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } @Override - public void finalizeRecovery() { - indexShard().finalizeRecovery(); + public FinalizeResponse finalizeRecovery() { + final IndexShard indexShard = indexShard(); + indexShard.finalizeRecovery(); + return new FinalizeResponse(indexShard.routingEntry().allocationId().getId(), indexShard.getLocalCheckpoint()); } @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 4772e2d0a8b..4d5b411ca4a 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -19,9 +19,12 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.transport.TransportResponse; import java.io.IOException; import java.util.List; @@ -41,7 +44,7 @@ public interface RecoveryTargetHandler { * new segments are available, and enables garbage collection of * tombstone files. The shard is also moved to the POST_RECOVERY phase during this time **/ - void finalizeRecovery(); + FinalizeResponse finalizeRecovery(); /** * Index a set of translog operations on the target @@ -71,4 +74,41 @@ public interface RecoveryTargetHandler { void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk, int totalTranslogOps) throws IOException; + class FinalizeResponse extends TransportResponse { + private long localCheckpoint; + private String allocationId; + + public FinalizeResponse(String allocationId, long localCheckpoint) { + this.localCheckpoint = localCheckpoint; + this.allocationId = allocationId; + } + + FinalizeResponse() { + + } + + public long getLocalCheckpoint() { + return localCheckpoint; + } + + public String getAllocationId() { + return allocationId; + } + + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeZLong(localCheckpoint); + out.writeString(allocationId); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + localCheckpoint = in.readZLong(); + allocationId = in.readString(); + } + } + } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java index 53feb8e6135..bf5d1cb9516 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java @@ -58,7 +58,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -300,11 +299,12 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve @Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception { + final RecoveryTargetHandler.FinalizeResponse response; try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() )) { - recoveryRef.status().finalizeRecovery(); + response = recoveryRef.status().finalizeRecovery(); } - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(response); } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index edc6c520be0..88a30976144 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.transport.EmptyTransportResponseHandler; +import org.elasticsearch.transport.FutureTransportResponseHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -82,11 +83,16 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { } @Override - public void finalizeRecovery() { - transportService.submitRequest(targetNode, RecoveryTargetService.Actions.FINALIZE, + public FinalizeResponse finalizeRecovery() { + return transportService.submitRequest(targetNode, RecoveryTargetService.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(recoveryId, shardId), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + new FutureTransportResponseHandler() { + @Override + public FinalizeResponse newInstance() { + return new FinalizeResponse(); + } + }).txGet(); } @Override diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index cc7558d1de8..0331ae90764 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -39,7 +39,6 @@ import org.elasticsearch.index.shard.IndexShardNotStartedException; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.TransportResponse; import java.io.IOException; import java.util.ArrayList; @@ -105,8 +104,9 @@ public class ReplicationOperationTests extends ESTestCase { PlainActionFuture listener = new PlainActionFuture<>(); final ClusterState finalState = state; final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures); + final TestPrimary primary = new TestPrimary(primaryShard, primaryTerm); final TestReplicationOperation op = new TestReplicationOperation(request, - new TestPrimary(primaryShard, primaryTerm), listener, replicasProxy, () -> finalState); + primary, listener, replicasProxy, () -> finalState); op.execute(); assertThat(request.primaryTerm(), equalTo(primaryTerm)); @@ -122,6 +122,9 @@ public class ReplicationOperationTests extends ESTestCase { indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED); final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size(); assertThat(shardInfo.getTotal(), equalTo(totalShards)); + + assertThat(primary.knownLocalCheckpoints.remove(primaryShard.allocationId().getId()), equalTo(primary.localCheckpoint)); + assertThat(primary.knownLocalCheckpoints, equalTo(replicasProxy.generatedLocalCheckpoints)); } @@ -368,10 +371,13 @@ public class ReplicationOperationTests extends ESTestCase { static class TestPrimary implements ReplicationOperation.Primary { final ShardRouting routing; final long term; + final long localCheckpoint; + final Map knownLocalCheckpoints = new HashMap<>(); TestPrimary(ShardRouting routing, long term) { this.routing = routing; this.term = term; + this.localCheckpoint = random().nextLong(); } @Override @@ -392,6 +398,36 @@ public class ReplicationOperationTests extends ESTestCase { request.primaryTerm(term); return new Tuple<>(new Response(), request); } + + @Override + public void updateLocalCheckpointForShard(String allocationId, long checkpoint) { + knownLocalCheckpoints.put(allocationId, checkpoint); + } + + @Override + public long localCheckpoint() { + return localCheckpoint; + } + } + + static class ReplicaResponse implements ReplicationOperation.ReplicaResponse { + final String allocationId; + final long localCheckpoint; + + ReplicaResponse(String allocationId, long localCheckpoint) { + this.allocationId = allocationId; + this.localCheckpoint = localCheckpoint; + } + + @Override + public long localCheckpoint() { + return localCheckpoint; + } + + @Override + public String allocationId() { + return allocationId; + } } static class TestReplicaProxy implements ReplicationOperation.Replicas { @@ -400,6 +436,8 @@ public class ReplicationOperationTests extends ESTestCase { final Set failedReplicas = ConcurrentCollections.newConcurrentSet(); + final Map generatedLocalCheckpoints = ConcurrentCollections.newConcurrentMap(); + TestReplicaProxy() { this(Collections.emptyMap()); } @@ -409,12 +447,16 @@ public class ReplicationOperationTests extends ESTestCase { } @Override - public void performOn(ShardRouting replica, Request request, ActionListener listener) { + public void performOn(ShardRouting replica, Request request, ActionListener listener) { assertTrue("replica request processed twice on [" + replica + "]", request.processedOnReplicas.add(replica)); if (opFailures.containsKey(replica)) { listener.onFailure(opFailures.get(replica)); } else { - listener.onResponse(TransportResponse.Empty.INSTANCE); + final long checkpoint = random().nextLong(); + final String allocationId = replica.allocationId().getId(); + Long existing = generatedLocalCheckpoints.put(allocationId, checkpoint); + assertNull(existing); + listener.onResponse(new ReplicaResponse(allocationId, checkpoint)); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 04306b0818c..1cdae0387d6 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.ReplicationOperation.ReplicaResponse; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -43,7 +44,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -502,7 +502,7 @@ public class TransportReplicationActionTests extends ESTestCase { setState(clusterService, state); // check that at unknown node fails - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); proxy.performOn( TestShardRouting.newShardRouting(shardId, "NOT THERE", false, randomFrom(ShardRoutingState.values())), new Request(), listener); @@ -519,9 +519,11 @@ public class TransportReplicationActionTests extends ESTestCase { CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear(); assertThat(captures, arrayWithSize(1)); if (randomBoolean()) { - transport.handleResponse(captures[0].requestId, TransportResponse.Empty.INSTANCE); + final TransportReplicationAction.ReplicaResponse response = + new TransportReplicationAction.ReplicaResponse(randomAsciiOfLength(10), randomLong()); + transport.handleResponse(captures[0].requestId, response); assertTrue(listener.isDone()); - listener.get(); + assertThat(listener.get(), equalTo(response)); } else if (randomBoolean()) { transport.handleRemoteError(captures[0].requestId, new ElasticsearchException("simulated")); assertTrue(listener.isDone()); @@ -598,7 +600,7 @@ public class TransportReplicationActionTests extends ESTestCase { final ShardId shardId = new ShardId(index, "_na_", 0); // we use one replica to check the primary term was set on the operation and sent to the replica setState(clusterService, - state(index, true, ShardRoutingState.STARTED, randomFrom(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED))); + state(index, true, ShardRoutingState.STARTED, randomFrom(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED))); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 7beeaaee0a6..19574fdd197 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -45,9 +45,9 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.analysis.AnalysisRegistry; -import org.elasticsearch.index.cache.query.QueryCache; -import org.elasticsearch.index.cache.query.IndexQueryCache; import org.elasticsearch.index.cache.query.DisabledQueryCache; +import org.elasticsearch.index.cache.query.IndexQueryCache; +import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.fielddata.IndexFieldDataCache; @@ -61,9 +61,9 @@ import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStoreConfig; import org.elasticsearch.indices.IndicesModule; +import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; -import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.indices.query.IndicesQueriesRegistry; @@ -88,8 +88,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import static java.util.Collections.emptyMap; - public class IndexModuleTests extends ESTestCase { private Index index; private Settings settings; @@ -154,7 +152,8 @@ public class IndexModuleTests extends ESTestCase { IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment)); module.setSearcherWrapper((s) -> new Wrapper()); module.engineFactory.set(new MockEngineFactory(AssertingDirectoryReader.class)); - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener)); + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, + mapperRegistry, shardId -> {} ,new IndicesFieldDataCache(settings, listener)); assertTrue(indexService.getSearcherWrapper() instanceof Wrapper); assertSame(indexService.getEngineFactory(), module.engineFactory.get()); indexService.close("simon says", false); @@ -177,7 +176,8 @@ public class IndexModuleTests extends ESTestCase { } catch (IllegalArgumentException ex) { // fine } - IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener)); + IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, + mapperRegistry, shardId -> {}, new IndicesFieldDataCache(settings, listener)); assertTrue(indexService.getIndexStore() instanceof FooStore); indexService.close("simon says", false); @@ -196,7 +196,7 @@ public class IndexModuleTests extends ESTestCase { Consumer listener = (s) -> {}; module.addIndexEventListener(eventListener); IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, - new IndicesFieldDataCache(settings, this.listener)); + shardId -> {}, new IndicesFieldDataCache(settings, this.listener)); IndexSettings x = indexService.getIndexSettings(); assertEquals(x.getSettings().getAsMap(), indexSettings.getSettings().getAsMap()); assertEquals(x.getIndex(), index); @@ -221,7 +221,7 @@ public class IndexModuleTests extends ESTestCase { } IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, - new IndicesFieldDataCache(settings, listener)); + shardId -> {}, new IndicesFieldDataCache(settings, listener)); assertSame(booleanSetting, indexService.getIndexSettings().getScopedSettings().get(booleanSetting.getKey())); indexService.close("simon says", false); @@ -244,7 +244,7 @@ public class IndexModuleTests extends ESTestCase { IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, - new IndicesFieldDataCache(settings, this.listener)); + shardId -> {}, new IndicesFieldDataCache(settings, this.listener)); assertEquals(2, indexService.getIndexOperationListeners().size()); assertEquals(IndexingSlowLog.class, indexService.getIndexOperationListeners().get(0).getClass()); assertSame(listener, indexService.getIndexOperationListeners().get(1)); @@ -274,7 +274,7 @@ public class IndexModuleTests extends ESTestCase { IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, - new IndicesFieldDataCache(settings, this.listener)); + shardId -> {}, new IndicesFieldDataCache(settings, this.listener)); assertEquals(2, indexService.getSearchOperationListener().size()); assertEquals(SearchSlowLog.class, indexService.getSearchOperationListener().get(0).getClass()); assertSame(listener, indexService.getSearchOperationListener().get(1)); @@ -307,7 +307,7 @@ public class IndexModuleTests extends ESTestCase { }); IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, - new IndicesFieldDataCache(settings, listener)); + shardId -> {}, new IndicesFieldDataCache(settings, listener)); SimilarityService similarityService = indexService.similarityService(); assertNotNull(similarityService.getSimilarity("my_similarity")); assertTrue(similarityService.getSimilarity("my_similarity").get() instanceof TestSimilarity); @@ -338,7 +338,7 @@ public class IndexModuleTests extends ESTestCase { IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment)); try { module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, - new IndicesFieldDataCache(settings, listener)); + shardId -> {}, new IndicesFieldDataCache(settings, listener)); } catch (IllegalArgumentException ex) { assertEquals("Unknown Similarity type [test_similarity] for [my_similarity]", ex.getMessage()); } @@ -353,7 +353,7 @@ public class IndexModuleTests extends ESTestCase { IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment)); try { module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, - new IndicesFieldDataCache(settings, listener)); + shardId -> {}, new IndicesFieldDataCache(settings, listener)); } catch (IllegalArgumentException ex) { assertEquals("Similarity [my_similarity] must have an associated type", ex.getMessage()); } @@ -367,7 +367,7 @@ public class IndexModuleTests extends ESTestCase { module.forceQueryCacheProvider((a, b) -> new CustomQueryCache()); expectThrows(AlreadySetException.class, () -> module.forceQueryCacheProvider((a, b) -> new CustomQueryCache())); IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, - new IndicesFieldDataCache(settings, listener)); + shardId -> {}, new IndicesFieldDataCache(settings, listener)); assertTrue(indexService.cache().query() instanceof CustomQueryCache); indexService.close("simon says", false); } @@ -378,7 +378,7 @@ public class IndexModuleTests extends ESTestCase { .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment)); IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, - new IndicesFieldDataCache(settings, listener)); + shardId -> {}, new IndicesFieldDataCache(settings, listener)); assertTrue(indexService.cache().query() instanceof IndexQueryCache); indexService.close("simon says", false); } @@ -391,7 +391,7 @@ public class IndexModuleTests extends ESTestCase { IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), null, new AnalysisRegistry(null, environment)); module.forceQueryCacheProvider((a, b) -> new CustomQueryCache()); IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, - new IndicesFieldDataCache(settings, listener)); + shardId -> {}, new IndicesFieldDataCache(settings, listener)); assertTrue(indexService.cache().query() instanceof DisabledQueryCache); indexService.close("simon says", false); } diff --git a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 97258b12a3b..b27f50054ef 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -280,6 +280,20 @@ public class IndexServiceTests extends ESSingleNodeTestCase { assertNull(indexService.getFsyncTask()); } + public void testGlobalCheckpointTaskIsRunning() throws IOException { + IndexService indexService = createIndex("test", Settings.EMPTY); + IndexService.AsyncGlobalCheckpointTask task = indexService.getGlobalCheckpointTask(); + assertNotNull(task); + assertEquals(IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL.getDefault(Settings.EMPTY), task.getInterval()); + assertTrue(task.mustReschedule()); + assertTrue(task.isScheduled()); + + indexService.close("simon says", false); + assertFalse(task.isScheduled()); + assertTrue(task.isClosed()); + } + + public void testRefreshActuallyWorks() throws Exception { IndexService indexService = createIndex("test", Settings.EMPTY); ensureGreen("test"); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 0d603eb8a77..d1b87d8b0d6 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.engine; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -1295,13 +1294,13 @@ public class InternalEngineTests extends ESTestCase { public void testVersioningCreateExistsExceptionWithFlush() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); - Engine.Index create = new Engine.Index(newUid("1"), doc, -1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0); + Engine.Index create = new Engine.Index(newUid("1"), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0); engine.index(create); assertThat(create.version(), equalTo(1L)); engine.flush(); - create = new Engine.Index(newUid("1"), doc, -1, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0); + create = new Engine.Index(newUid("1"), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0); try { engine.index(create); fail(); @@ -1520,10 +1519,10 @@ public class InternalEngineTests extends ESTestCase { } } } - assertThat(engine.seqNoStats().getMaxSeqNo(), equalTo(seqNoCount)); - assertThat(engine.seqNoStats().getLocalCheckpoint(), equalTo(seqNoCount)); - assertThat(replicaEngine.seqNoStats().getMaxSeqNo(), equalTo(seqNoCount)); - assertThat(replicaEngine.seqNoStats().getLocalCheckpoint(), equalTo(seqNoCount)); + assertThat(engine.seqNoService().stats().getMaxSeqNo(), equalTo(seqNoCount)); + assertThat(engine.seqNoService().stats().getLocalCheckpoint(), equalTo(seqNoCount)); + assertThat(replicaEngine.seqNoService().stats().getMaxSeqNo(), equalTo(seqNoCount)); + assertThat(replicaEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(seqNoCount)); } // #8603: make sure we can separately log IFD's messages diff --git a/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java b/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java new file mode 100644 index 00000000000..ca172f8683c --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java @@ -0,0 +1,79 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.hamcrest.Matcher; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; + +@TestLogging("index.shard:TRACE,index.seqno:TRACE") +public class CheckpointsIT extends ESIntegTestCase { + + public void testCheckpointsAdvance() throws Exception { + prepareCreate("test").setSettings( + "index.seq_no.checkpoint_sync_interval", "100ms", // update global point frequently + "index.number_of_shards", "1" // simplify things so we know how many ops goes to the shards + ).get(); + final List builders = new ArrayList<>(); + final int numDocs = scaledRandomIntBetween(0, 100); + logger.info("--> will index [{}] docs", numDocs); + for (int i = 0; i < numDocs; i++) { + builders.add(client().prepareIndex("test", "type", "id_" + i).setSource("{}")); + } + indexRandom(randomBoolean(), false, builders); + + assertBusy(() -> { + IndicesStatsResponse stats = client().admin().indices().prepareStats("test").clear().get(); + for (ShardStats shardStats : stats.getShards()) { + if (shardStats.getSeqNoStats() == null) { + assertFalse("no seq_no stats for primary " + shardStats.getShardRouting(), shardStats.getShardRouting().primary()); + continue; + } + logger.debug("seq_no stats for {}: {}", shardStats.getShardRouting(), + XContentHelper.toString(shardStats.getSeqNoStats(), + new ToXContent.MapParams(Collections.singletonMap("pretty", "false")))); + final Matcher localCheckpointRule; + if (shardStats.getShardRouting().primary()) { + localCheckpointRule = equalTo(numDocs - 1L); + } else { + // nocommit: recovery doesn't transfer local checkpoints yet (we don't persist them in lucene). + localCheckpointRule = anyOf(equalTo(numDocs - 1L), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); + } + assertThat(shardStats.getShardRouting() + " local checkpoint mismatch", + shardStats.getSeqNoStats().getLocalCheckpoint(), localCheckpointRule); + assertThat(shardStats.getShardRouting() + " global checkpoint mismatch", + shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(numDocs - 1L)); + assertThat(shardStats.getShardRouting() + " max seq no mismatch", + shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(numDocs - 1L)); + } + }); + } +} diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java new file mode 100644 index 00000000000..557f0ca1996 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java @@ -0,0 +1,143 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.junit.Before; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +public class GlobalCheckpointTests extends ESTestCase { + + GlobalCheckpointService checkpointService; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + checkpointService = new GlobalCheckpointService(new ShardId("test", "_na_", 0), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY)); + } + + public void testEmptyShards() { + assertFalse("checkpoint shouldn't be updated when the are no active shards", checkpointService.updateCheckpointOnPrimary()); + assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + } + + public void testGlobalCheckpointUpdate() { + Map allocations = new HashMap<>(); + Set active = new HashSet<>(); + Set insync = new HashSet<>(); + Set tracking = new HashSet<>(); + long maxLocalCheckpoint = Long.MAX_VALUE; + for (int i = randomIntBetween(3, 10); i > 0; i--) { + String id = "id_" + i + "_" + randomAsciiOfLength(5); + long localCheckpoint = randomInt(200); + switch (randomInt(2)) { + case 0: + active.add(id); + maxLocalCheckpoint = Math.min(maxLocalCheckpoint, localCheckpoint); + break; + case 1: + insync.add(id); + maxLocalCheckpoint = Math.min(maxLocalCheckpoint, localCheckpoint); + break; + case 2: + tracking.add(id); + break; + default: + throw new IllegalStateException("you messed up your numbers, didn't you?"); + } + allocations.put(id, localCheckpoint); + } + + if (maxLocalCheckpoint == Long.MAX_VALUE) { + // note: this state can not happen in practice as we always have at least one primary shard active/in sync + // it is however nice not to assume this on this level and check we do the right thing. + maxLocalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } + + assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + + logger.info("--> using allocations"); + allocations.keySet().stream().forEach(aId -> { + final String type; + if (active.contains(aId)) { + type = "active"; + } else if (insync.contains(aId)) { + type = "insync"; + } else if (tracking.contains(aId)) { + type = "tracked"; + } else { + throw new IllegalStateException(aId + " not found in any map"); + } + logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type); + }); + + Set initializing = new HashSet<>(insync); + initializing.addAll(tracking); + + checkpointService.updateAllocationIdsFromMaster(active, initializing); + allocations.keySet().stream().forEach(aId -> checkpointService.updateLocalCheckpoint(aId, allocations.get(aId))); + + // make sure insync allocation count + insync.stream().forEach(aId -> checkpointService.markAllocationIdAsInSync(aId, randomBoolean() ? 0 : allocations.get(aId))); + + assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + + assertThat(checkpointService.updateCheckpointOnPrimary(), equalTo(maxLocalCheckpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertThat(checkpointService.getCheckpoint(), equalTo(maxLocalCheckpoint)); + + // increment checkpoints + active.stream().forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4))); + insync.stream().forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4))); + allocations.keySet().stream().forEach(aId -> checkpointService.updateLocalCheckpoint(aId, allocations.get(aId))); + + // now insert an unknown active/insync id , the checkpoint shouldn't change but a refresh should be requested. + final String extraId = "extra_" + randomAsciiOfLength(5); + + // first check that adding it without the master blessing doesn't change anything. + checkpointService.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4)); + assertThat(checkpointService.getLocalCheckpointForAllocation(extraId), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + + Set newActive = new HashSet<>(active); + newActive.add(extraId); + checkpointService.updateAllocationIdsFromMaster(newActive, initializing); + + // we should ask for a refresh , but not update the checkpoint + assertTrue(checkpointService.updateCheckpointOnPrimary()); + assertThat(checkpointService.getCheckpoint(), equalTo(maxLocalCheckpoint)); + + // now notify for the new id + checkpointService.updateLocalCheckpoint(extraId, maxLocalCheckpoint + 1 + randomInt(4)); + + // now it should be incremented + assertTrue(checkpointService.updateCheckpointOnPrimary()); + assertThat(checkpointService.getCheckpoint(), greaterThan(maxLocalCheckpoint)); + } +} diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java index 79944eb149d..e040d813934 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java @@ -40,9 +40,9 @@ import static org.hamcrest.Matchers.isOneOf; public class LocalCheckpointServiceTests extends ESTestCase { - LocalCheckpointService checkpointService; + private LocalCheckpointService checkpointService; - final int SMALL_CHUNK_SIZE = 4; + private final int SMALL_CHUNK_SIZE = 4; @Override @Before @@ -51,19 +51,19 @@ public class LocalCheckpointServiceTests extends ESTestCase { checkpointService = getCheckpointService(); } - protected LocalCheckpointService getCheckpointService() { + private LocalCheckpointService getCheckpointService() { return new LocalCheckpointService( new ShardId("test", "_na_", 0), IndexSettingsModule.newIndexSettings("test", Settings.builder() - .put(LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE, SMALL_CHUNK_SIZE) + .put(LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE) .build() )); } public void testSimplePrimary() { long seqNo1, seqNo2; - assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); seqNo1 = checkpointService.generateSeqNo(); assertThat(seqNo1, equalTo(0L)); checkpointService.markSeqNoAsCompleted(seqNo1); @@ -79,7 +79,7 @@ public class LocalCheckpointServiceTests extends ESTestCase { } public void testSimpleReplica() { - assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); checkpointService.markSeqNoAsCompleted(0L); assertThat(checkpointService.getCheckpoint(), equalTo(0L)); checkpointService.markSeqNoAsCompleted(2L); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 0e3c77d9b77..07adb4d4acc 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1414,7 +1414,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, - indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners) + indexService.getThreadPool(), indexService.getBigArrays(), null, shard.getGlobalCheckpointSyncer(), + Collections.emptyList(), Arrays.asList(listeners) ); return newShard; } diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index bbb1e8bf2bf..4370553f8e3 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -95,7 +95,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas indicesService.deleteIndex(idx, "simon says"); try { NodeServicesProvider nodeServicesProvider = getInstanceFromNode(NodeServicesProvider.class); - IndexService index = indicesService.createIndex(nodeServicesProvider, metaData, Arrays.asList(countingListener)); + IndexService index = indicesService.createIndex(nodeServicesProvider, metaData, Arrays.asList(countingListener), s -> {}); idx = index.index(); ShardRouting newRouting = shardRouting; String nodeId = newRouting.currentNodeId(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 995ca5480f3..b0ca2e7f561 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -80,7 +80,6 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.node.MockNode; import org.elasticsearch.node.Node; -import org.elasticsearch.node.internal.InternalSettingsPreparer; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; @@ -162,7 +161,9 @@ public final class InternalTestCluster extends TestCluster { private static final int JVM_ORDINAL = Integer.parseInt(System.getProperty(SysGlobals.CHILDVM_SYSPROP_JVM_ID, "0")); - /** a per-JVM unique offset to be used for calculating unique port ranges. */ + /** + * a per-JVM unique offset to be used for calculating unique port ranges. + */ public static final int JVM_BASE_PORT_OFFSET = PORTS_PER_JVM * (JVM_ORDINAL + 1); private static final AtomicInteger clusterOrdinal = new AtomicInteger(); @@ -349,7 +350,7 @@ public final class InternalTestCluster extends TestCluster { private Settings getSettings(int nodeOrdinal, long nodeSeed, Settings others) { Builder builder = Settings.builder().put(defaultSettings) - .put(getRandomNodeSettings(nodeSeed)); + .put(getRandomNodeSettings(nodeSeed)); Settings settings = nodeConfigurationSource.nodeSettings(nodeOrdinal); if (settings != null) { if (settings.get(ClusterName.CLUSTER_NAME_SETTING.getKey()) != null) { @@ -536,7 +537,7 @@ public final class InternalTestCluster extends TestCluster { } // prevent killing the master if possible and client nodes final Stream collection = - n == 0 ? nodes.values().stream() : nodes.values().stream().filter(new DataNodePredicate().and(new MasterNodePredicate(getMasterName()).negate())); + n == 0 ? nodes.values().stream() : nodes.values().stream().filter(new DataNodePredicate().and(new MasterNodePredicate(getMasterName()).negate())); final Iterator values = collection.iterator(); logger.info("changing cluster size from {} to {}, {} data nodes", size(), n + numShareCoordOnlyNodes, n); @@ -574,11 +575,11 @@ public final class InternalTestCluster extends TestCluster { String name = buildNodeName(nodeId); assert !nodes.containsKey(name); Settings finalSettings = Settings.builder() - .put(Environment.PATH_HOME_SETTING.getKey(), baseDir) // allow overriding path.home - .put(settings) - .put("node.name", name) - .put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), seed) - .build(); + .put(Environment.PATH_HOME_SETTING.getKey(), baseDir) // allow overriding path.home + .put(settings) + .put("node.name", name) + .put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), seed) + .build(); MockNode node = new MockNode(finalSettings, version, plugins); return new NodeAndClient(name, node); } @@ -874,14 +875,14 @@ public final class InternalTestCluster extends TestCluster { TransportAddress addr = node.injector().getInstance(TransportService.class).boundAddress().publishAddress(); Settings nodeSettings = node.settings(); Builder builder = Settings.builder() - .put("client.transport.nodes_sampler_interval", "1s") - .put(Environment.PATH_HOME_SETTING.getKey(), baseDir) - .put("node.name", TRANSPORT_CLIENT_PREFIX + node.settings().get("node.name")) - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName).put("client.transport.sniff", sniff) - .put(Node.NODE_MODE_SETTING.getKey(), Node.NODE_MODE_SETTING.exists(nodeSettings) ? Node.NODE_MODE_SETTING.get(nodeSettings) : nodeMode) - .put("logger.prefix", nodeSettings.get("logger.prefix", "")) - .put("logger.level", nodeSettings.get("logger.level", "INFO")) - .put(settings); + .put("client.transport.nodes_sampler_interval", "1s") + .put(Environment.PATH_HOME_SETTING.getKey(), baseDir) + .put("node.name", TRANSPORT_CLIENT_PREFIX + node.settings().get("node.name")) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName).put("client.transport.sniff", sniff) + .put(Node.NODE_MODE_SETTING.getKey(), Node.NODE_MODE_SETTING.exists(nodeSettings) ? Node.NODE_MODE_SETTING.get(nodeSettings) : nodeMode) + .put("logger.prefix", nodeSettings.get("logger.prefix", "")) + .put("logger.level", nodeSettings.get("logger.level", "INFO")) + .put(settings); if (Node.NODE_LOCAL_SETTING.exists(nodeSettings)) { builder.put(Node.NODE_LOCAL_SETTING.getKey(), Node.NODE_LOCAL_SETTING.get(nodeSettings)); @@ -1028,7 +1029,14 @@ public final class InternalTestCluster extends TestCluster { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - assertThat("index shard counter on shard " + indexShard.shardId() + " on node " + nodeAndClient.name + " not 0", indexShard.getActiveOperationsCount(), equalTo(0)); + // we assert busy as we can have background global checkpoint activity + try { + assertBusy(() -> { + assertThat("index shard counter on shard " + indexShard.shardId() + " on node " + nodeAndClient.name + " not 0", indexShard.getActiveOperationsCount(), equalTo(0)); + }); + } catch (Exception e) { + throw new RuntimeException("unexpected error while checking for shard counters", e); + } } } } @@ -1377,11 +1385,11 @@ public final class InternalTestCluster extends TestCluster { private synchronized Set nRandomDataNodes(int numNodes) { assert size() >= numNodes; Map dataNodes = - nodes - .entrySet() - .stream() - .filter(new EntryNodePredicate(new DataNodePredicate())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + nodes + .entrySet() + .stream() + .filter(new EntryNodePredicate(new DataNodePredicate())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); final HashSet set = new HashSet<>(); final Iterator iterator = dataNodes.keySet().iterator(); for (int i = 0; i < numNodes; i++) { @@ -1634,10 +1642,10 @@ public final class InternalTestCluster extends TestCluster { private synchronized Collection filterNodes(Map map, Predicate predicate) { return map - .values() - .stream() - .filter(predicate) - .collect(Collectors.toCollection(ArrayList::new)); + .values() + .stream() + .filter(predicate) + .collect(Collectors.toCollection(ArrayList::new)); } private static final class DataNodePredicate implements Predicate { @@ -1651,7 +1659,7 @@ public final class InternalTestCluster extends TestCluster { @Override public boolean test(NodeAndClient nodeAndClient) { return DiscoveryNode.isDataNode(nodeAndClient.node.settings()) || - DiscoveryNode.isMasterNode(nodeAndClient.node.settings()); + DiscoveryNode.isMasterNode(nodeAndClient.node.settings()); } } @@ -1889,6 +1897,7 @@ public final class InternalTestCluster extends TestCluster { /** * Simple interface that allows to wait for an async operation to finish + * * @param the result of the async execution */ public interface Async {