Introduce global checkpoint background sync

It is the exciting return of the global checkpoint background
sync. Long, long ago, in snapshot version far, far away we had and only
had a global checkpoint background sync. This sync would fire
periodically and send the global checkpoint from the primary shard to
the replicas so that they could update their local knowledge of the
global checkpoint. Later in time, as we sped ahead towards finalizing
the initial version of sequence IDs, we realized that we need the global
checkpoint updates to be inline. This means that on a replication
operation, the primary shard would piggy back the global checkpoint with
the replication operation to the replicas. The replicas would update
their local knowledge of the global checkpoint and reply with their
local checkpoint. However, this could allow the global checkpoint on the
primary to advance again and the replicas would fall behind in their
local knowledge of the global checkpoint. If another replication
operation never fired, then the replicas would be permanently behind. To
account for this, we added one more sync that would fire when the
primary shard fell idle. However, this has problems:
 - the shard idle timer defaults to five minutes, a long time to wait
   for the replicas to learn of the new global checkpoint
 - if a replica missed the sync, there was no follow-up sync to catch
   them up
 - there is an inherent race condition where the primary shard could
   fall idle mid-operation (after having sent the replication request to
   the replicas); in this case, there would never be a background sync
   after the operation completes
 - tying the global checkpoint sync to the idle timer was never natural

To fix this, we add two additional changes for the global checkpoint to
be synced to the replicas. The first is that we add a post-operation
sync that only fires if there are no operations in flight and there is a
lagging replica. This gives us a chance to sync the global checkpoint to
the replicas immediately after an operation so that they are always kept
up to date. The second is that we add back a global checkpoint
background sync that fires on a timer. This timer fires every thirty
seconds, and is not configurable (for simplicity). This background sync
is smarter than what we had previously in the sense that it only sends a
sync if the global checkpoint on at least one replica is lagging that of
the primary. When the timer fires, we can compare the global checkpoint
on the primary to its knowledge of the global checkpoint on the replicas
and only send a sync if there is a shard behind.

Relates #26591
This commit is contained in:
Jason Tedor 2017-09-21 15:34:13 -04:00 committed by GitHub
parent c760eec054
commit f35d1de502
24 changed files with 676 additions and 175 deletions

View File

@ -186,7 +186,7 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
allprojects {
ext.bwc_tests_enabled = true
ext.bwc_tests_enabled = false
}
task verifyBwcTestsEnabled {

View File

@ -20,7 +20,9 @@
package org.elasticsearch.action.support.replication;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
@ -55,6 +57,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
@ -108,12 +111,26 @@ public abstract class TransportReplicationAction<
protected final String transportReplicaAction;
protected final String transportPrimaryAction;
private final boolean syncGlobalCheckpointAfterOperation;
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor, false);
}
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor,
boolean syncGlobalCheckpointAfterOperation) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
this.transportService = transportService;
this.clusterService = clusterService;
@ -126,6 +143,8 @@ public abstract class TransportReplicationAction<
registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);
this.transportOptions = transportOptions();
this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
}
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
@ -150,7 +169,7 @@ public abstract class TransportReplicationAction<
new ReroutePhase((ReplicationTask) task, request, listener).run();
}
protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
protected ReplicationOperation.Replicas<ReplicaRequest> newReplicasProxy(long primaryTerm) {
return new ReplicasProxy(primaryTerm);
}
@ -359,6 +378,17 @@ public abstract class TransportReplicationAction<
return new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
if (syncGlobalCheckpointAfterOperation) {
try {
primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
logger.info("post-operation global checkpoint sync failed", e);
// intentionally swallow, a missed global checkpoint sync should not fail this operation
}
}
}
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
try {

View File

@ -71,7 +71,7 @@ public abstract class TransportWriteAction<
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor);
indexNameExpressionResolver, request, replicaRequest, executor, true);
}
/** Syncs operation result to the translog or throws a shard not available failure */

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@ -40,6 +39,7 @@ import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.BM25SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService;

View File

@ -25,11 +25,15 @@ import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
@ -82,6 +86,7 @@ import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
@ -109,10 +114,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean deleted = new AtomicBoolean(false);
private final IndexSettings indexSettings;
private final List<IndexingOperationListener> indexingOperationListeners;
private final List<SearchOperationListener> searchOperationListeners;
private final List<IndexingOperationListener> indexingOperationListeners;
private volatile AsyncRefreshTask refreshTask;
private volatile AsyncTranslogFSync fsyncTask;
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
@ -182,11 +188,12 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
this.engineFactory = engineFactory;
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
// kick off async ops for the first shard in this index
this.refreshTask = new AsyncRefreshTask(this);
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
}
@ -268,7 +275,15 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
}
} finally {
IOUtils.close(bitsetFilterCache, indexCache, indexFieldData, mapperService, refreshTask, fsyncTask, trimTranslogTask);
IOUtils.close(
bitsetFilterCache,
indexCache,
indexFieldData,
mapperService,
refreshTask,
fsyncTask,
trimTranslogTask,
globalCheckpointTask);
}
}
}
@ -293,8 +308,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
}
public synchronized IndexShard createShard(ShardRouting routing) throws IOException {
final boolean primary = routing.primary();
public synchronized IndexShard createShard(ShardRouting routing, Consumer<ShardId> globalCheckpointSyncer) throws IOException {
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
@ -365,7 +379,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
indexCache, mapperService, similarityService, engineFactory,
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
searchOperationListeners, indexingOperationListeners);
searchOperationListeners, indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId));
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
@ -710,6 +724,44 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
}
private void maybeSyncGlobalCheckpoints() {
for (final IndexShard shard : this.shards.values()) {
if (shard.routingEntry().active() && shard.routingEntry().primary()) {
switch (shard.state()) {
case CLOSED:
case CREATED:
case RECOVERING:
case RELOCATED:
continue;
case POST_RECOVERY:
assert false : "shard " + shard.shardId() + " is in post-recovery but marked as active";
continue;
case STARTED:
try {
shard.acquirePrimaryOperationPermit(
ActionListener.wrap(
releasable -> {
try (Releasable ignored = releasable) {
shard.maybeSyncGlobalCheckpoint("background");
}
},
e -> {
if (!(e instanceof AlreadyClosedException || e instanceof IndexShardClosedException)) {
logger.info("failed to execute background global checkpoint sync", e);
}
}),
ThreadPool.Names.SAME);
} catch (final AlreadyClosedException | IndexShardClosedException e) {
// the shard was closed concurrently, continue
}
continue;
default:
throw new IllegalStateException("unknown state [" + shard.state() + "]");
}
}
}
}
abstract static class BaseAsyncTask implements Runnable, Closeable {
protected final IndexService indexService;
protected final ThreadPool threadPool;
@ -877,6 +929,41 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
}
// this setting is intentionally not registered, it is only used in tests
public static final Setting<TimeValue> GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING =
Setting.timeSetting(
"index.global_checkpoint_sync.interval",
new TimeValue(30, TimeUnit.SECONDS),
new TimeValue(0, TimeUnit.MILLISECONDS),
Property.Dynamic,
Property.IndexScope);
/**
* Background task that syncs the global checkpoint to replicas.
*/
final class AsyncGlobalCheckpointTask extends BaseAsyncTask {
AsyncGlobalCheckpointTask(final IndexService indexService) {
// index.global_checkpoint_sync_interval is not a real setting, it is only registered in tests
super(indexService, GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()));
}
@Override
protected void runInternal() {
indexService.maybeSyncGlobalCheckpoints();
}
@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
}
@Override
public String toString() {
return "global_checkpoint_sync";
}
}
AsyncRefreshTask getRefreshTask() { // for tests
return refreshTask;
}
@ -885,6 +972,10 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
return fsyncTask;
}
AsyncGlobalCheckpointTask getGlobalCheckpointTask() {
return globalCheckpointTask;
}
/**
* Clears the caches for the given shard id if the shard is still allocated on this node
*/

View File

@ -19,7 +19,10 @@
package org.elasticsearch.index.seqno;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
@ -34,6 +37,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -47,7 +51,7 @@ import org.elasticsearch.transport.TransportService;
public class GlobalCheckpointSyncAction extends TransportReplicationAction<
GlobalCheckpointSyncAction.Request,
GlobalCheckpointSyncAction.Request,
ReplicationResponse> implements IndexEventListener {
ReplicationResponse> {
public static String ACTION_NAME = "indices:admin/seq_no/global_checkpoint_sync";
@ -73,7 +77,17 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
indexNameExpressionResolver,
Request::new,
Request::new,
ThreadPool.Names.SAME);
ThreadPool.Names.MANAGEMENT);
}
public void updateGlobalCheckpointForShard(final ShardId shardId) {
execute(
new Request(shardId),
ActionListener.wrap(r -> {}, e -> {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
logger.info(shardId + " global checkpoint sync failed", e);
}
}));
}
@Override
@ -94,11 +108,6 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
}
}
@Override
public void onShardInactive(final IndexShard indexShard) {
execute(new Request(indexShard.shardId()));
}
@Override
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
final Request request, final IndexShard indexShard) throws Exception {

View File

@ -209,13 +209,20 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
}
}
synchronized ObjectLongMap<String> getGlobalCheckpoints() {
/**
* Get the local knowledge of the global checkpoints for all in-sync allocation IDs.
*
* @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID
*/
synchronized ObjectLongMap<String> getInSyncGlobalCheckpoints() {
assert primaryMode;
assert handoffInProgress == false;
final ObjectLongMap<String> globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size());
for (final Map.Entry<String, CheckpointState> cps : checkpoints.entrySet()) {
globalCheckpoints.put(cps.getKey(), cps.getValue().globalCheckpoint);
}
final ObjectLongMap<String> globalCheckpoints = new ObjectLongHashMap<>(checkpoints.size()); // upper bound on the size
checkpoints
.entrySet()
.stream()
.filter(e -> e.getValue().inSync)
.forEach(e -> globalCheckpoints.put(e.getKey(), e.getValue().globalCheckpoint));
return globalCheckpoints;
}

View File

@ -138,8 +138,13 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
globalCheckpointTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
}
public ObjectLongMap<String> getGlobalCheckpoints() {
return globalCheckpointTracker.getGlobalCheckpoints();
/**
* Get the local knowledge of the global checkpoints for all in-sync allocation IDs.
*
* @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID
*/
public ObjectLongMap<String> getInSyncGlobalCheckpoints() {
return globalCheckpointTracker.getInSyncGlobalCheckpoints();
}
/**

View File

@ -156,6 +156,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.index.mapper.SourceToParse.source;
@ -197,6 +198,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
protected final EngineFactory engineFactory;
private final IndexingOperationListener indexingOperationListeners;
private final Runnable globalCheckpointSyncer;
Runnable getGlobalCheckpointSyncer() {
return globalCheckpointSyncer;
}
@Nullable
private RecoveryState recoveryState;
@ -233,11 +239,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
*/
private final RefreshListeners refreshListeners;
public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardPath path, Store store,
Supplier<Sort> indexSortSupplier, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService,
@Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, ThreadPool threadPool, BigArrays bigArrays,
Engine.Warmer warmer, List<SearchOperationListener> searchOperationListener, List<IndexingOperationListener> listeners) throws IOException {
public IndexShard(
ShardRouting shardRouting,
IndexSettings indexSettings,
ShardPath path,
Store store,
Supplier<Sort> indexSortSupplier,
IndexCache indexCache,
MapperService mapperService,
SimilarityService similarityService,
@Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener,
IndexSearcherWrapper indexSearcherWrapper,
ThreadPool threadPool,
BigArrays bigArrays,
Engine.Warmer warmer,
List<SearchOperationListener> searchOperationListener,
List<IndexingOperationListener> listeners,
Runnable globalCheckpointSyncer) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
this.shardRouting = shardRouting;
@ -257,6 +276,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final List<IndexingOperationListener> listenersList = new ArrayList<>(listeners);
listenersList.add(internalIndexingStats);
this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger);
this.globalCheckpointSyncer = globalCheckpointSyncer;
final List<SearchOperationListener> searchListenersList = new ArrayList<>(searchOperationListener);
searchListenersList.add(searchStats);
this.searchOperationListener = new SearchOperationListener.CompositeListener(searchListenersList, logger);
@ -1723,11 +1743,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException {
verifyPrimary();
getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint);
/*
* We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a background sync to
* the replica; mark our self as active to force a future background sync.
*/
active.compareAndSet(false, true);
}
/**
@ -1748,10 +1763,44 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return getEngine().seqNoService().getGlobalCheckpoint();
}
public ObjectLongMap<String> getGlobalCheckpoints() {
/**
* Get the local knowledge of the global checkpoints for all in-sync allocation IDs.
*
* @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID
*/
public ObjectLongMap<String> getInSyncGlobalCheckpoints() {
verifyPrimary();
verifyNotClosed();
return getEngine().seqNoService().getGlobalCheckpoints();
return getEngine().seqNoService().getInSyncGlobalCheckpoints();
}
/**
* Syncs the global checkpoint to the replicas if the global checkpoint on at least one replica is behind the global checkpoint on the
* primary.
*/
public void maybeSyncGlobalCheckpoint(final String reason) {
verifyPrimary();
verifyNotClosed();
if (state == IndexShardState.RELOCATED) {
return;
}
// only sync if there are not operations in flight
final SeqNoStats stats = getEngine().seqNoService().stats();
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) {
final ObjectLongMap<String> globalCheckpoints = getInSyncGlobalCheckpoints();
final String allocationId = routingEntry().allocationId().getId();
assert globalCheckpoints.containsKey(allocationId);
final long globalCheckpoint = globalCheckpoints.get(allocationId);
final boolean syncNeeded =
StreamSupport
.stream(globalCheckpoints.values().spliterator(), false)
.anyMatch(v -> v.value < globalCheckpoint);
// only sync if there is a shard lagging the primary
if (syncNeeded) {
logger.trace("syncing global checkpoint for [{}]", reason);
globalCheckpointSyncer.run();
}
}
}
/**

View File

@ -43,7 +43,6 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -375,12 +374,15 @@ public class IndicesService extends AbstractLifecycleComponent
/**
* Creates a new {@link IndexService} for the given metadata.
* @param indexMetaData the index metadata to create the index for
* @param builtInListeners a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with the per-index listeners
*
* @param indexMetaData the index metadata to create the index for
* @param builtInListeners a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with the
* per-index listeners
* @throws ResourceAlreadyExistsException if the index already exists.
*/
@Override
public synchronized IndexService createIndex(IndexMetaData indexMetaData, List<IndexEventListener> builtInListeners) throws IOException {
public synchronized IndexService createIndex(
final IndexMetaData indexMetaData, final List<IndexEventListener> builtInListeners) throws IOException {
ensureChangesAllowed();
if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
@ -399,13 +401,13 @@ public class IndicesService extends AbstractLifecycleComponent
finalListeners.add(onStoreClose);
finalListeners.add(oldShardsStats);
final IndexService indexService =
createIndexService(
"create index",
indexMetaData,
indicesQueryCache,
indicesFieldDataCache,
finalListeners,
indexingMemoryController);
createIndexService(
"create index",
indexMetaData,
indicesQueryCache,
indicesFieldDataCache,
finalListeners,
indexingMemoryController);
boolean success = false;
try {
indexService.getIndexEventListener().afterIndexCreated(indexService);
@ -423,7 +425,8 @@ public class IndicesService extends AbstractLifecycleComponent
* This creates a new IndexService without registering it
*/
private synchronized IndexService createIndexService(final String reason,
IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache,
IndexMetaData indexMetaData,
IndicesQueryCache indicesQueryCache,
IndicesFieldDataCache indicesFieldDataCache,
List<IndexEventListener> builtInListeners,
IndexingOperationListener... indexingOperationListeners) throws IOException {
@ -454,7 +457,8 @@ public class IndicesService extends AbstractLifecycleComponent
indicesQueryCache,
mapperRegistry,
indicesFieldDataCache,
namedWriteableRegistry);
namedWriteableRegistry
);
}
/**
@ -499,10 +503,11 @@ public class IndicesService extends AbstractLifecycleComponent
@Override
public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure) throws IOException {
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer) throws IOException {
ensureChangesAllowed();
IndexService indexService = indexService(shardRouting.index());
IndexShard indexShard = indexService.createShard(shardRouting);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService,
(type, mapping) -> {

View File

@ -118,35 +118,44 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final boolean sendRefreshMapping;
private final List<IndexEventListener> buildInIndexListener;
private final PrimaryReplicaSyncer primaryReplicaSyncer;
private final Consumer<ShardId> globalCheckpointSyncer;
@Inject
public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
ThreadPool threadPool, PeerRecoveryTargetService recoveryTargetService,
public IndicesClusterStateService(Settings settings,
IndicesService indicesService,
ClusterService clusterService,
ThreadPool threadPool,
PeerRecoveryTargetService recoveryTargetService,
ShardStateAction shardStateAction,
NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService,
GlobalCheckpointSyncAction globalCheckpointSyncAction,
PrimaryReplicaSyncer primaryReplicaSyncer) {
SearchService searchService,
SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService,
SnapshotShardsService snapshotShardsService,
PrimaryReplicaSyncer primaryReplicaSyncer,
GlobalCheckpointSyncAction globalCheckpointSyncAction) {
this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
clusterService, threadPool, recoveryTargetService, shardStateAction,
nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService,
snapshotShardsService, globalCheckpointSyncAction, primaryReplicaSyncer);
snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard);
}
// for tests
IndicesClusterStateService(Settings settings,
AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService,
ClusterService clusterService,
ThreadPool threadPool, PeerRecoveryTargetService recoveryTargetService,
ThreadPool threadPool,
PeerRecoveryTargetService recoveryTargetService,
ShardStateAction shardStateAction,
NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService,
GlobalCheckpointSyncAction globalCheckpointSyncAction,
PrimaryReplicaSyncer primaryReplicaSyncer) {
SearchService searchService,
SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService,
SnapshotShardsService snapshotShardsService,
PrimaryReplicaSyncer primaryReplicaSyncer,
Consumer<ShardId> globalCheckpointSyncer) {
super(settings);
this.buildInIndexListener =
Arrays.asList(
@ -154,8 +163,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
recoveryTargetService,
searchService,
syncedFlushService,
snapshotShardsService,
globalCheckpointSyncAction);
snapshotShardsService);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;
@ -164,6 +172,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
this.repositoriesService = repositoriesService;
this.primaryReplicaSyncer = primaryReplicaSyncer;
this.globalCheckpointSyncer = globalCheckpointSyncer;
this.sendRefreshMapping = this.settings.getAsBoolean("indices.cluster.send_refresh_mapping", true);
}
@ -541,7 +550,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
logger.debug("{} creating shard", shardRouting.shardId());
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting),
repositoriesService, failedShardHandler);
repositoriesService, failedShardHandler, globalCheckpointSyncer);
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
}
@ -830,7 +839,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
*/
T createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure) throws IOException;
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer) throws IOException;
/**
* Returns shard for the specified id if it exists otherwise returns <code>null</code>.

View File

@ -215,6 +215,8 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
}
}, 30, TimeUnit.SECONDS);
assertSeqNos();
logger.info("done validating (iteration [{}])", iter);
}
} finally {

View File

@ -141,7 +141,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
ReplicationGroup(final IndexMetaData indexMetaData) throws IOException {
final ShardRouting primaryRouting = this.createShardRouting("s0", true);
primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting));
primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {});
replicas = new ArrayList<>();
this.indexMetaData = indexMetaData;
updateAllocationIDsOnPrimary();
@ -238,7 +238,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
public IndexShard addReplica() throws IOException {
final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false);
final IndexShard replica =
newShard(replicaRouting, indexMetaData, null, getEngineFactory(replicaRouting));
newShard(replicaRouting, indexMetaData, null, getEngineFactory(replicaRouting), () -> {});
addReplica(replica);
return replica;
}
@ -259,8 +259,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
false, ShardRoutingState.INITIALIZING,
RecoverySource.PeerRecoverySource.INSTANCE);
final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null,
getEngineFactory(shardRouting));
final IndexShard newReplica =
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {});
replicas.add(newReplica);
updateAllocationIDsOnPrimary();
return newReplica;

View File

@ -0,0 +1,210 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.equalTo;
public class GlobalCheckpointSyncIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(
super.nodePlugins().stream(),
Stream.of(InternalSettingsPlugin.class, MockTransportService.TestPlugin.class))
.collect(Collectors.toList());
}
public void testPostOperationGlobalCheckpointSync() throws Exception {
// set the sync interval high so it does not execute during this test
runGlobalCheckpointSyncTest(TimeValue.timeValueHours(24), client -> {}, client -> {});
}
/*
* This test swallows the post-operation global checkpoint syncs, and then restores the ability to send these requests at the end of the
* test so that a background sync can fire and sync the global checkpoint.
*/
public void testBackgroundGlobalCheckpointSync() throws Exception {
runGlobalCheckpointSyncTest(
TimeValue.timeValueSeconds(randomIntBetween(1, 3)),
client -> {
// prevent global checkpoint syncs between all nodes
final DiscoveryNodes nodes = client.admin().cluster().prepareState().get().getState().getNodes();
for (final DiscoveryNode node : nodes) {
for (final DiscoveryNode other : nodes) {
if (node == other) {
continue;
}
final MockTransportService senderTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, node.getName());
final MockTransportService receiverTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, other.getName());
senderTransportService.addDelegate(receiverTransportService,
new MockTransportService.DelegateTransport(senderTransportService.original()) {
@Override
protected void sendRequest(
final Connection connection,
final long requestId,
final String action,
final TransportRequest request,
final TransportRequestOptions options) throws IOException {
if ("indices:admin/seq_no/global_checkpoint_sync[r]".equals(action)) {
throw new IllegalStateException("blocking indices:admin/seq_no/global_checkpoint_sync[r]");
} else {
super.sendRequest(connection, requestId, action, request, options);
}
}
});
}
}
},
client -> {
// restore global checkpoint syncs between all nodes
final DiscoveryNodes nodes = client.admin().cluster().prepareState().get().getState().getNodes();
for (final DiscoveryNode node : nodes) {
for (final DiscoveryNode other : nodes) {
if (node == other) {
continue;
}
final MockTransportService senderTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, node.getName());
final MockTransportService receiverTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, other.getName());
senderTransportService.clearRule(receiverTransportService);
}
}
});
}
private void runGlobalCheckpointSyncTest(
final TimeValue globalCheckpointSyncInterval,
final Consumer<Client> beforeIndexing,
final Consumer<Client> afterIndexing) throws Exception {
final int numberOfReplicas = randomIntBetween(1, 4);
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
prepareCreate(
"test",
Settings.builder()
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), globalCheckpointSyncInterval)
.put("index.number_of_replicas", numberOfReplicas))
.get();
if (randomBoolean()) {
ensureGreen();
}
beforeIndexing.accept(client());
final int numberOfDocuments = randomIntBetween(0, 256);
final int numberOfThreads = randomIntBetween(1, 4);
final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
// start concurrent indexing threads
final List<Thread> threads = new ArrayList<>(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
final int index = i;
final Thread thread = new Thread(() -> {
try {
barrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
for (int j = 0; j < numberOfDocuments; j++) {
final String id = Integer.toString(index * numberOfDocuments + j);
client().prepareIndex("test", "test", id).setSource("{\"foo\": " + id + "}", XContentType.JSON).get();
}
try {
barrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
});
threads.add(thread);
thread.start();
}
// synchronize the start of the threads
barrier.await();
// wait for the threads to finish
barrier.await();
afterIndexing.accept(client());
assertBusy(() -> {
final IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get();
final IndexStats indexStats = stats.getIndex("test");
for (final IndexShardStats indexShardStats : indexStats.getIndexShards().values()) {
Optional<ShardStats> maybePrimary =
Stream.of(indexShardStats.getShards())
.filter(s -> s.getShardRouting().active() && s.getShardRouting().primary())
.findFirst();
if (!maybePrimary.isPresent()) {
continue;
}
final ShardStats primary = maybePrimary.get();
final SeqNoStats primarySeqNoStats = primary.getSeqNoStats();
for (final ShardStats shardStats : indexShardStats) {
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
if (seqNoStats == null) {
// the shard is initializing
continue;
}
assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
}
}
});
for (final Thread thread : threads) {
thread.join();
}
}
}

View File

@ -539,7 +539,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(),
shard.store(), indexService.getIndexSortSupplier(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners));
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners), () -> {});
return newShard;
}

View File

@ -136,6 +136,7 @@ import java.util.stream.IntStream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.max;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
@ -711,6 +712,66 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(indexShard);
}
public void testGlobalCheckpointSync() throws IOException {
// create the primary shard with a callback that sets a boolean when the global checkpoint sync is invoked
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting =
TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(8),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE);
final Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
final IndexMetaData.Builder indexMetadata = IndexMetaData.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
final AtomicBoolean synced = new AtomicBoolean();
final IndexShard primaryShard = newShard(shardRouting, indexMetadata.build(), null, null, () -> { synced.set(true); });
// add a replicas
recoverShardFromStore(primaryShard);
final IndexShard replicaShard = newShard(shardId, false);
recoverReplica(replicaShard, primaryShard);
final int maxSeqNo = randomIntBetween(0, 128);
for (int i = 0; i < maxSeqNo; i++) {
primaryShard.getEngine().seqNoService().generateSeqNo();
}
final long checkpoint = rarely() ? maxSeqNo - scaledRandomIntBetween(0, maxSeqNo) : maxSeqNo;
// set up local checkpoints on the shard copies
primaryShard.updateLocalCheckpointForShard(shardRouting.allocationId().getId(), checkpoint);
final int replicaLocalCheckpoint = randomIntBetween(0, Math.toIntExact(checkpoint));
final String replicaAllocationId = replicaShard.routingEntry().allocationId().getId();
primaryShard.updateLocalCheckpointForShard(replicaAllocationId, replicaLocalCheckpoint);
// initialize the local knowledge on the primary of the global checkpoint on the replica shards
final int replicaGlobalCheckpoint = Math.toIntExact(primaryShard.getGlobalCheckpoint());
primaryShard.updateGlobalCheckpointForShard(
replicaAllocationId,
randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), replicaGlobalCheckpoint));
// simulate a background maybe sync; it should only run if the knowledge on the replica of the global checkpoint lags the primary
primaryShard.maybeSyncGlobalCheckpoint("test");
assertThat(
synced.get(),
equalTo(maxSeqNo == primaryShard.getGlobalCheckpoint() && (replicaGlobalCheckpoint < checkpoint)));
// simulate that the background sync advanced the global checkpoint on the replica
primaryShard.updateGlobalCheckpointForShard(replicaAllocationId, primaryShard.getGlobalCheckpoint());
// reset our boolean so that we can assert after another simulated maybe sync
synced.set(false);
primaryShard.maybeSyncGlobalCheckpoint("test");
// this time there should not be a sync since all the replica copies are caught up with the primary
assertFalse(synced.get());
closeShards(replicaShard, primaryShard);
}
public void testRestoreLocalCheckpointTrackerFromTranslogOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
@ -1678,7 +1739,7 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(shard);
IndexShard newShard = newShard(
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null);
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {});
recoverShardFromStore(newShard);
@ -1824,7 +1885,7 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(shard);
IndexShard newShard = newShard(
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null);
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {});
recoverShardFromStore(newShard);

View File

@ -130,7 +130,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
newRouting = newRouting.moveToUnassigned(unassignedInfo)
.updateUnassigned(unassignedInfo, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE);
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
IndexShard shard = index.createShard(newRouting);
IndexShard shard = index.createShard(newRouting, s -> {});
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
assertEquals(5, counter.get());
final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),

View File

@ -226,7 +226,8 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure) throws IOException {
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer) throws IOException {
failRandomly();
MockIndexService indexService = indexService(recoveryState.getShardId().getIndex());
MockIndexShard indexShard = indexService.createShard(shardRouting);

View File

@ -410,20 +410,20 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
final ShardStateAction shardStateAction = mock(ShardStateAction.class);
final PrimaryReplicaSyncer primaryReplicaSyncer = mock(PrimaryReplicaSyncer.class);
return new IndicesClusterStateService(
settings,
indicesService,
clusterService,
threadPool,
recoveryTargetService,
shardStateAction,
null,
repositoriesService,
null,
null,
null,
null,
null,
primaryReplicaSyncer);
settings,
indicesService,
clusterService,
threadPool,
recoveryTargetService,
shardStateAction,
null,
repositoriesService,
null,
null,
null,
null,
primaryReplicaSyncer,
s -> {});
}
private class RecordingIndicesService extends MockIndicesService {

View File

@ -20,22 +20,16 @@
package org.elasticsearch.recovery;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.procedures.IntProcedure;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.English;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@ -47,13 +41,10 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.plugins.Plugin;
@ -63,6 +54,7 @@ import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.MockIndexEventListener;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
@ -81,7 +73,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
@ -104,48 +95,13 @@ public class RelocationIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockIndexEventListener.TestPlugin.class);
return Arrays.asList(InternalSettingsPlugin.class, MockTransportService.TestPlugin.class, MockIndexEventListener.TestPlugin.class);
}
@Override
protected void beforeIndexDeletion() throws Exception {
super.beforeIndexDeletion();
assertBusy(() -> {
IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get();
for (IndexStats indexStats : stats.getIndices().values()) {
for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) {
Optional<ShardStats> maybePrimary = Stream.of(indexShardStats.getShards())
.filter(s -> s.getShardRouting().active() && s.getShardRouting().primary())
.findFirst();
if (maybePrimary.isPresent() == false) {
continue;
}
ShardStats primary = maybePrimary.get();
final SeqNoStats primarySeqNoStats = primary.getSeqNoStats();
final ShardRouting primaryShardRouting = primary.getShardRouting();
assertThat(primaryShardRouting + " should have set the global checkpoint",
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId());
final IndicesService indicesService =
internalCluster().getInstance(IndicesService.class, node.getName());
final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId());
final ObjectLongMap<String> globalCheckpoints = indexShard.getGlobalCheckpoints();
for (ShardStats shardStats : indexShardStats) {
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
assertThat(shardStats.getShardRouting() + " local checkpoint mismatch",
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
assertThat(shardStats.getShardRouting() + " global checkpoint mismatch",
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
assertThat(shardStats.getShardRouting() + " max seq no mismatch",
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
assertThat(
seqNoStats.getGlobalCheckpoint(),
equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId())));
}
}
}
});
assertSeqNos();
}
public void testSimpleRelocationNoIndexing() {
@ -301,11 +257,14 @@ public class RelocationIT extends ESIntegTestCase {
nodes[0] = internalCluster().startNode();
logger.info("--> creating test index ...");
prepareCreate("test", Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
.put("index.refresh_interval", -1) // we want to control refreshes c
).get();
prepareCreate(
"test",
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
.put("index.refresh_interval", -1) // we want to control refreshes
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms"))
.get();
for (int i = 1; i < numberOfNodes; i++) {
logger.info("--> starting [node_{}] ...", i);
@ -383,9 +342,6 @@ public class RelocationIT extends ESIntegTestCase {
}
// refresh is a replication action so this forces a global checkpoint sync which is needed as these are asserted on in tear down
client().admin().indices().prepareRefresh("test").get();
}
public void testCancellationCleansTempFiles() throws Exception {
@ -481,11 +437,12 @@ public class RelocationIT extends ESIntegTestCase {
logger.info("red nodes: {}", (Object)redNodes);
ensureStableCluster(halfNodes * 2);
assertAcked(prepareCreate("test", Settings.builder()
.put("index.routing.allocation.exclude.color", "blue")
.put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1))
));
final Settings.Builder settings = Settings.builder()
.put("index.routing.allocation.exclude.color", "blue")
.put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1))
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms");
assertAcked(prepareCreate("test", settings));
assertAllShardsOnNodes("test", redNodes);
int numDocs = randomIntBetween(100, 150);
ArrayList<String> ids = new ArrayList<>();
@ -526,8 +483,6 @@ public class RelocationIT extends ESIntegTestCase {
assertSearchHits(afterRelocation, ids.toArray(new String[ids.size()]));
}
// refresh is a replication action so this forces a global checkpoint sync which is needed as these are asserted on in tear down
client().admin().indices().prepareRefresh("test").get();
}
class RecoveryCorruption extends MockTransportService.DelegateTransport {

View File

@ -213,18 +213,17 @@ public class IndexingIT extends ESRestTestCase {
final int numberOfInitialDocs = 1 + randomInt(5);
logger.info("indexing [{}] docs initially", numberOfInitialDocs);
numDocs += indexDocs(index, numDocs, numberOfInitialDocs);
assertOK(client().performRequest("POST", index + "/_refresh")); // this forces a global checkpoint sync
assertSeqNoOnShards(index, nodes, numDocs, newNodeClient);
logger.info("allowing shards on all nodes");
updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name"));
ensureGreen();
assertOK(client().performRequest("POST", index + "/_refresh"));
for (final String bwcName : bwcNamesList) {
assertCount(index, "_only_nodes:" + bwcName, numDocs);
}
final int numberOfDocsAfterAllowingShardsOnAllNodes = 1 + randomInt(5);
logger.info("indexing [{}] docs after allowing shards on all nodes", numberOfDocsAfterAllowingShardsOnAllNodes);
numDocs += indexDocs(index, numDocs, numberOfDocsAfterAllowingShardsOnAllNodes);
assertOK(client().performRequest("POST", index + "/_refresh")); // this forces a global checkpoint sync
assertSeqNoOnShards(index, nodes, numDocs, newNodeClient);
Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get();
logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName());
@ -235,7 +234,6 @@ public class IndexingIT extends ESRestTestCase {
logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary);
numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterMovingPrimary);
numDocs += numberOfDocsAfterMovingPrimary;
assertOK(client().performRequest("POST", index + "/_refresh")); // this forces a global checkpoint sync
assertSeqNoOnShards(index, nodes, numDocs, newNodeClient);
/*
* Dropping the number of replicas to zero, and then increasing it to one triggers a recovery thus exercising any BWC-logic in

View File

@ -160,7 +160,9 @@ public abstract class IndexShardTestCase extends ESTestCase {
* @param shardRouting the {@link ShardRouting} to use for this shard
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting shardRouting, IndexingOperationListener... listeners) throws IOException {
protected IndexShard newShard(
final ShardRouting shardRouting,
final IndexingOperationListener... listeners) throws IOException {
assert shardRouting.initializing() : shardRouting;
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
@ -197,9 +199,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
*/
protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, IndexMetaData indexMetaData,
@Nullable IndexSearcherWrapper searcherWrapper) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting, indexMetaData, searcherWrapper, null);
return newShard(shardId, primary, nodeId, indexMetaData, searcherWrapper, () -> {});
}
/**
@ -211,11 +211,10 @@ public abstract class IndexShardTestCase extends ESTestCase {
* (ready to recover from another shard)
*/
protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, IndexMetaData indexMetaData,
Runnable globalCheckpointSyncer,
@Nullable IndexSearcherWrapper searcherWrapper) throws IOException {
@Nullable IndexSearcherWrapper searcherWrapper, Runnable globalCheckpointSyncer) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting, indexMetaData, searcherWrapper, null);
return newShard(shardRouting, indexMetaData, searcherWrapper, null, globalCheckpointSyncer);
}
@ -229,40 +228,45 @@ public abstract class IndexShardTestCase extends ESTestCase {
*/
protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners)
throws IOException {
return newShard(routing, indexMetaData, null, null, listeners);
return newShard(routing, indexMetaData, null, null, () -> {}, listeners);
}
/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* current node id the shard is assigned to.
* @param routing shard routing to use
* @param routing shard routing to use
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param indexSearcherWrapper an optional wrapper to be used during searchers
* @param globalCheckpointSyncer callback for syncing global checkpoints
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData,
@Nullable IndexSearcherWrapper indexSearcherWrapper,
@Nullable EngineFactory engineFactory,
Runnable globalCheckpointSyncer,
IndexingOperationListener... listeners)
throws IOException {
// add node id as name to settings for proper logging
final ShardId shardId = routing.shardId();
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, listeners);
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, listeners);
}
/**
* creates a new initializing shard.
* @param routing shard routing to use
* @param shardPath path to use for shard data
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param indexSearcherWrapper an optional wrapper to be used during searchers
* @param listeners an optional set of listeners to add to the shard
*
* @param routing shard routing to use
* @param shardPath path to use for shard data
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param indexSearcherWrapper an optional wrapper to be used during searchers
* @param globalCheckpointSyncer callback for syncing global checkpoints
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
@Nullable IndexSearcherWrapper indexSearcherWrapper,
@Nullable EngineFactory engineFactory,
Runnable globalCheckpointSyncer,
IndexingOperationListener... listeners) throws IOException {
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
@ -279,9 +283,9 @@ public abstract class IndexShardTestCase extends ESTestCase {
};
final Engine.Warmer warmer = searcher -> {
};
indexShard = new IndexShard(routing, indexSettings, shardPath, store, () ->null, indexCache, mapperService, similarityService,
indexShard = new IndexShard(routing, indexSettings, shardPath, store, () -> null, indexCache, mapperService, similarityService,
engineFactory, indexEventListener, indexSearcherWrapper, threadPool,
BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners));
BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer);
success = true;
} finally {
if (success == false) {
@ -311,7 +315,14 @@ public abstract class IndexShardTestCase extends ESTestCase {
*/
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException {
closeShards(current);
return newShard(routing, current.shardPath(), current.indexSettings().getIndexMetaData(), null, current.engineFactory, listeners);
return newShard(
routing,
current.shardPath(),
current.indexSettings().getIndexMetaData(),
null,
current.engineFactory,
current.getGlobalCheckpointSyncer(),
listeners);
}
/**

View File

@ -19,6 +19,7 @@
package org.elasticsearch.test;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.randomizedtesting.RandomizedContext;
import com.carrotsearch.randomizedtesting.annotations.TestGroup;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
@ -49,6 +50,10 @@ import org.elasticsearch.action.admin.indices.segments.IndexSegments;
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@ -69,6 +74,7 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -114,6 +120,9 @@ import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
@ -161,6 +170,7 @@ import java.util.IdentityHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
@ -191,6 +201,7 @@ import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
@ -2194,4 +2205,44 @@ public abstract class ESIntegTestCase extends ESTestCase {
String uuid = getIndexResponse.getSettings().get(index).get(IndexMetaData.SETTING_INDEX_UUID);
return new Index(index, uuid);
}
protected void assertSeqNos() throws Exception {
assertBusy(() -> {
IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get();
for (IndexStats indexStats : stats.getIndices().values()) {
for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) {
Optional<ShardStats> maybePrimary = Stream.of(indexShardStats.getShards())
.filter(s -> s.getShardRouting().active() && s.getShardRouting().primary())
.findFirst();
if (maybePrimary.isPresent() == false) {
continue;
}
ShardStats primary = maybePrimary.get();
final SeqNoStats primarySeqNoStats = primary.getSeqNoStats();
final ShardRouting primaryShardRouting = primary.getShardRouting();
assertThat(primaryShardRouting + " should have set the global checkpoint",
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId());
final IndicesService indicesService =
internalCluster().getInstance(IndicesService.class, node.getName());
final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId());
final ObjectLongMap<String> globalCheckpoints = indexShard.getInSyncGlobalCheckpoints();
for (ShardStats shardStats : indexShardStats) {
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
assertThat(shardStats.getShardRouting() + " local checkpoint mismatch",
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
assertThat(shardStats.getShardRouting() + " global checkpoint mismatch",
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
assertThat(shardStats.getShardRouting() + " max seq no mismatch",
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
assertThat(
seqNoStats.getGlobalCheckpoint(),
equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId())));
}
}
}
});
}
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.plugins.Plugin;
import java.util.Arrays;
@ -44,7 +45,12 @@ public final class InternalSettingsPlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(VERSION_CREATED, MERGE_ENABLED,
INDEX_CREATION_DATE_SETTING, PROVIDED_NAME_SETTING, TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING);
return Arrays.asList(
VERSION_CREATED,
MERGE_ENABLED,
INDEX_CREATION_DATE_SETTING,
PROVIDED_NAME_SETTING,
TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING,
IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING);
}
}