diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 3338f43ab4a..6b2997b1a97 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -382,8 +382,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple if (recoveryState.getSourceNode().equals(sourceNode) == false) { if (recoveryTargetService.cancelRecoveriesForShard(shardId, "recovery source node changed")) { // getting here means that the shard was still recovering - logger.debug("{} removing shard (recovery source changed), current [{}], global [{}])", - shardId, currentRoutingEntry, newShardRouting); + logger.debug("{} removing shard (recovery source changed), current [{}], global [{}], shard [{}])", + shardId, recoveryState.getSourceNode(), sourceNode, newShardRouting); indexService.removeShard(shardId.id(), "removing shard (recovery source node changed)"); } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index e1a197533e6..0d3ee87e3bb 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.shard.IndexShard; @@ -31,7 +32,6 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,10 +48,12 @@ public class RecoveriesCollection { private final ESLogger logger; private final ThreadPool threadPool; + private final Callback ensureClusterStateVersionCallback; - public RecoveriesCollection(ESLogger logger, ThreadPool threadPool) { + public RecoveriesCollection(ESLogger logger, ThreadPool threadPool, Callback ensureClusterStateVersionCallback) { this.logger = logger; this.threadPool = threadPool; + this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback; } /** @@ -61,7 +63,7 @@ public class RecoveriesCollection { */ public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener, TimeValue activityTimeout) { - RecoveryTarget status = new RecoveryTarget(indexShard, sourceNode, listener); + RecoveryTarget status = new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback); RecoveryTarget existingStatus = onGoingRecoveries.putIfAbsent(status.recoveryId(), status); assert existingStatus == null : "found two RecoveryStatus instances with the same id"; logger.trace("{} started recovery from {}, id [{}]", indexShard.shardId(), sourceNode, status.recoveryId()); @@ -175,7 +177,6 @@ public class RecoveriesCollection { return cancelled; } - /** * a reference to {@link RecoveryTarget}, which implements {@link AutoCloseable}. closing the reference * causes {@link RecoveryTarget#decRef()} to be called. This makes sure that the underlying resources diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index 77fedd8184f..c17b7e2a251 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexEventListener; @@ -44,6 +45,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; /** * The source recovery accepts recovery requests from other peer shards and start the recovery process from this @@ -101,13 +103,7 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe throw new DelayRecoveryException("source shard is not marked yet as relocating to [" + request.targetNode() + "]"); } - ShardRouting targetShardRouting = null; - for (ShardRouting shardRouting : node) { - if (shardRouting.shardId().equals(request.shardId())) { - targetShardRouting = shardRouting; - break; - } - } + ShardRouting targetShardRouting = node.getByShardId(request.shardId()); if (targetShardRouting == null) { logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(), request.targetNode()); throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); @@ -118,17 +114,8 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe throw new DelayRecoveryException("source node has the state of the target shard to be [" + targetShardRouting.state() + "], expecting to be [initializing]"); } + RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard); logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode()); - final RecoverySourceHandler handler; - final RemoteRecoveryTargetHandler recoveryTarget = - new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(), - recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)); - if (shard.indexSettings().isOnSharedFilesystem()) { - handler = new SharedFSRecoverySourceHandler(shard, recoveryTarget, request, logger); - } else { - handler = new RecoverySourceHandler(shard, recoveryTarget, request, recoverySettings.getChunkSize().bytesAsInt(), logger); - } - ongoingRecoveries.add(shard, handler); try { return handler.recoverToTarget(); } finally { @@ -144,38 +131,35 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe } } - private static final class OngoingRecoveries { - private final Map> ongoingRecoveries = new HashMap<>(); + private final class OngoingRecoveries { + private final Map ongoingRecoveries = new HashMap<>(); - synchronized void add(IndexShard shard, RecoverySourceHandler handler) { - Set shardRecoveryHandlers = ongoingRecoveries.get(shard); - if (shardRecoveryHandlers == null) { - shardRecoveryHandlers = new HashSet<>(); - ongoingRecoveries.put(shard, shardRecoveryHandlers); - } - assert shardRecoveryHandlers.contains(handler) == false : "Handler was already registered [" + handler + "]"; - shardRecoveryHandlers.add(handler); + synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) { + final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext()); + RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard); shard.recoveryStats().incCurrentAsSource(); + return handler; } synchronized void remove(IndexShard shard, RecoverySourceHandler handler) { - final Set shardRecoveryHandlers = ongoingRecoveries.get(shard); - assert shardRecoveryHandlers != null : "Shard was not registered [" + shard + "]"; - boolean remove = shardRecoveryHandlers.remove(handler); + final ShardRecoveryContext shardRecoveryContext = ongoingRecoveries.get(shard); + assert shardRecoveryContext != null : "Shard was not registered [" + shard + "]"; + boolean remove = shardRecoveryContext.recoveryHandlers.remove(handler); assert remove : "Handler was not registered [" + handler + "]"; if (remove) { shard.recoveryStats().decCurrentAsSource(); } - if (shardRecoveryHandlers.isEmpty()) { + if (shardRecoveryContext.recoveryHandlers.isEmpty()) { ongoingRecoveries.remove(shard); + assert shardRecoveryContext.onNewRecoveryException == null; } } synchronized void cancel(IndexShard shard, String reason) { - final Set shardRecoveryHandlers = ongoingRecoveries.get(shard); - if (shardRecoveryHandlers != null) { + final ShardRecoveryContext shardRecoveryContext = ongoingRecoveries.get(shard); + if (shardRecoveryContext != null) { final List failures = new ArrayList<>(); - for (RecoverySourceHandler handlers : shardRecoveryHandlers) { + for (RecoverySourceHandler handlers : shardRecoveryContext.recoveryHandlers) { try { handlers.cancel(reason); } catch (Exception ex) { @@ -187,6 +171,60 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe ExceptionsHelper.maybeThrowRuntimeAndSuppress(failures); } } + + private final class ShardRecoveryContext { + final Set recoveryHandlers = new HashSet<>(); + + @Nullable + private DelayRecoveryException onNewRecoveryException; + + /** + * Adds recovery source handler if recoveries are not delayed from starting (see also {@link #delayNewRecoveries}. + * Throws {@link DelayRecoveryException} if new recoveries are delayed from starting. + */ + synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) { + if (onNewRecoveryException != null) { + throw onNewRecoveryException; + } + RecoverySourceHandler handler = createRecoverySourceHandler(request, shard); + recoveryHandlers.add(handler); + return handler; + } + + private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) { + RecoverySourceHandler handler; + final RemoteRecoveryTargetHandler recoveryTarget = + new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(), + recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)); + Supplier currentClusterStateVersionSupplier = () -> clusterService.state().getVersion(); + if (shard.indexSettings().isOnSharedFilesystem()) { + handler = new SharedFSRecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier, + this::delayNewRecoveries, logger); + } else { + handler = new RecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier, + this::delayNewRecoveries, recoverySettings.getChunkSize().bytesAsInt(), logger); + } + return handler; + } + + /** + * Makes new recoveries throw a {@link DelayRecoveryException} with the provided message. + * + * Throws {@link IllegalStateException} if new recoveries are already being delayed. + */ + synchronized Releasable delayNewRecoveries(String exceptionMessage) throws IllegalStateException { + if (onNewRecoveryException != null) { + throw new IllegalStateException("already delaying recoveries"); + } + onNewRecoveryException = new DelayRecoveryException(exceptionMessage); + return this::unblockNewRecoveries; + } + + + private synchronized void unblockNewRecoveries() { + onNewRecoveryException = null; + } + } } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 6c126c58397..7d201a3ca78 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.unit.ByteSizeValue; @@ -41,6 +42,7 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.index.shard.IndexShardRelocatedException; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -53,6 +55,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.List; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.StreamSupport; /** @@ -75,6 +78,8 @@ public class RecoverySourceHandler { private final int shardId; // Request containing source and target node information private final StartRecoveryRequest request; + private final Supplier currentClusterStateVersionSupplier; + private final Function delayNewRecoveries; private final int chunkSizeInBytes; private final RecoveryTargetHandler recoveryTarget; @@ -98,11 +103,15 @@ public class RecoverySourceHandler { public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget, final StartRecoveryRequest request, + final Supplier currentClusterStateVersionSupplier, + Function delayNewRecoveries, final int fileChunkSizeInBytes, final ESLogger logger) { this.shard = shard; this.recoveryTarget = recoveryTarget; this.request = request; + this.currentClusterStateVersionSupplier = currentClusterStateVersionSupplier; + this.delayNewRecoveries = delayNewRecoveries; this.logger = logger; this.indexName = this.request.shardId().getIndex().getName(); this.shardId = this.request.shardId().id(); @@ -136,6 +145,22 @@ public class RecoverySourceHandler { } } + // engine was just started at the end of phase 1 + if (shard.state() == IndexShardState.RELOCATED) { + /** + * The primary shard has been relocated while we copied files. This means that we can't guarantee any more that all + * operations that were replicated during the file copy (when the target engine was not yet opened) will be present in the + * local translog and thus will be resent on phase 2. The reason is that an operation replicated by the target primary is + * sent to the recovery target and the local shard (old primary) concurrently, meaning it may have arrived at the recovery + * target before we opened the engine and is still in-flight on the local shard. + * + * Checking the relocated status here, after we opened the engine on the target, is safe because primary relocation waits + * for all ongoing operations to complete and be fully replicated. Therefore all future operation by the new primary are + * guaranteed to reach the target shard when it's engine is open. + */ + throw new IndexShardRelocatedException(request.shardId()); + } + logger.trace("{} snapshot translog for recovery. current size is [{}]", shard.shardId(), translogView.totalOperations()); try { phase2(translogView.snapshot()); @@ -362,12 +387,18 @@ public class RecoverySourceHandler { cancellableThreads.execute(recoveryTarget::finalizeRecovery); if (isPrimaryRelocation()) { - logger.trace("[{}][{}] performing relocation hand-off to {}", indexName, shardId, request.targetNode()); - try { + // in case of primary relocation we have to ensure that the cluster state on the primary relocation target has all + // replica shards that have recovered or are still recovering from the current primary, otherwise replication actions + // will not be send to these replicas. To accomplish this, first block new recoveries, then take version of latest cluster + // state. This means that no new recovery can be completed based on information of a newer cluster state than the current one. + try (Releasable ignored = delayNewRecoveries.apply("primary relocation hand-off in progress or completed for " + shardId)) { + final long currentClusterStateVersion = currentClusterStateVersionSupplier.get(); + logger.trace("[{}][{}] waiting on {} to have cluster state with version [{}]", indexName, shardId, request.targetNode(), + currentClusterStateVersion); + cancellableThreads.execute(() -> recoveryTarget.ensureClusterStateVersion(currentClusterStateVersion)); + + logger.trace("[{}][{}] performing relocation hand-off to {}", indexName, shardId, request.targetNode()); cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode())); - } catch (Exception e) { - logger.debug("[{}][{}] completing relocation hand-off to {} failed", e, indexName, shardId, request.targetNode()); - throw e; } /** * if the recovery process fails after setting the shard state to RELOCATED, both relocation source and diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 683b0a87eac..4a503b7dad0 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -56,10 +57,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** - * + * Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of + * this class are created through {@link RecoveriesCollection}. */ - - public class RecoveryTarget extends AbstractRefCounted implements RecoveryTargetHandler { private final ESLogger logger; @@ -75,6 +75,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget private final String tempFilePrefix; private final Store store; private final RecoveryTargetService.RecoveryListener listener; + private final Callback ensureClusterStateVersionCallback; private final AtomicBoolean finished = new AtomicBoolean(); @@ -87,15 +88,26 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget private final Map tempFileNames = ConcurrentCollections.newConcurrentMap(); private RecoveryTarget(RecoveryTarget copyFrom) { // copy constructor - this(copyFrom.indexShard, copyFrom.sourceNode, copyFrom.listener, copyFrom.cancellableThreads, copyFrom.recoveryId); + this(copyFrom.indexShard, copyFrom.sourceNode, copyFrom.listener, copyFrom.cancellableThreads, copyFrom.recoveryId, + copyFrom.ensureClusterStateVersionCallback); } - public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener) { - this(indexShard, sourceNode, listener, new CancellableThreads(), idGenerator.incrementAndGet()); + public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener, + Callback ensureClusterStateVersionCallback) { + this(indexShard, sourceNode, listener, new CancellableThreads(), idGenerator.incrementAndGet(), ensureClusterStateVersionCallback); } - + /** + * creates a new recovery target object that represents a recovery to the provided indexShard + * + * @param indexShard local shard where we want to recover to + * @param sourceNode source node of the recovery where we recover from + * @param listener called when recovery is completed / failed + * @param ensureClusterStateVersionCallback callback to ensure that the current node is at least on a cluster state with the provided + * version. Necessary for primary relocation so that new primary knows about all other ongoing + * replica recoveries when replicating documents (see {@link RecoverySourceHandler}). + */ private RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener, - CancellableThreads cancellableThreads, long recoveryId) { + CancellableThreads cancellableThreads, long recoveryId, Callback ensureClusterStateVersionCallback) { super("recovery_status"); this.cancellableThreads = cancellableThreads; this.recoveryId = recoveryId; @@ -106,6 +118,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget this.shardId = indexShard.shardId(); this.tempFilePrefix = RECOVERY_PREFIX + UUIDs.base64UUID() + "."; this.store = indexShard.store(); + this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback; // make sure the store is not released until we are done. store.incRef(); indexShard.recoveryStats().incCurrentAsTarget(); @@ -321,6 +334,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget indexShard().finalizeRecovery(); } + @Override + public void ensureClusterStateVersion(long clusterStateVersion) { + ensureClusterStateVersionCallback.handle(clusterStateVersion); + } + @Override public void indexTranslogOperations(List operations, int totalTranslogOps) throws TranslogRecoveryPerformer .BatchOperationException { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 4772e2d0a8b..3d7e4f29c35 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -43,6 +43,11 @@ public interface RecoveryTargetHandler { **/ void finalizeRecovery(); + /** + * Blockingly waits for cluster state with at least clusterStateVersion to be available + */ + void ensureClusterStateVersion(long clusterStateVersion); + /** * Index a set of translog operations on the target * @param operations operations to index diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java index e641d199ab4..a23b8060b17 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java @@ -24,6 +24,7 @@ import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -42,11 +43,11 @@ import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.FutureTransportResponseHandler; @@ -76,6 +77,7 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops"; public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog"; public static final String FINALIZE = "internal:index/shard/recovery/finalize"; + public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate"; } private final ThreadPool threadPool; @@ -95,7 +97,7 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve this.transportService = transportService; this.recoverySettings = recoverySettings; this.clusterService = clusterService; - this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool); + this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool, this::waitForClusterState); transportService.registerRequestHandler(Actions.FILES_INFO, RecoveryFilesInfoRequest::new, ThreadPool.Names.GENERIC, new FilesInfoRequestHandler()); @@ -109,6 +111,8 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve new TranslogOperationsRequestHandler()); transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new FinalizeRecoveryRequestHandler()); + transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new, + ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler()); } @Override @@ -301,6 +305,18 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve } } + class WaitForClusterStateRequestHandler implements TransportRequestHandler { + + @Override + public void messageReceived(RecoveryWaitForClusterStateRequest request, TransportChannel channel) throws Exception { + try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() + )) { + recoveryRef.status().ensureClusterStateVersion(request.clusterStateVersion()); + } + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + } + class TranslogOperationsRequestHandler implements TransportRequestHandler { @Override @@ -362,6 +378,52 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve } } + private void waitForClusterState(long clusterStateVersion) { + ClusterStateObserver observer = new ClusterStateObserver(clusterService, TimeValue.timeValueMinutes(5), logger, + threadPool.getThreadContext()); + final ClusterState clusterState = observer.observedState(); + if (clusterState.getVersion() >= clusterStateVersion) { + logger.trace("node has cluster state with version higher than {} (current: {})", clusterStateVersion, + clusterState.getVersion()); + return; + } else { + logger.trace("waiting for cluster state version {} (current: {})", clusterStateVersion, clusterState.getVersion()); + final PlainActionFuture future = new PlainActionFuture<>(); + observer.waitForNextChange(new ClusterStateObserver.Listener() { + + @Override + public void onNewClusterState(ClusterState state) { + future.onResponse(null); + } + + @Override + public void onClusterServiceClose() { + future.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + future.onFailure(new IllegalStateException("cluster state never updated to version " + clusterStateVersion)); + } + }, new ClusterStateObserver.ValidationPredicate() { + + @Override + protected boolean validate(ClusterState newState) { + return newState.getVersion() >= clusterStateVersion; + } + }); + try { + future.get(); + logger.trace("successfully waited for cluster state with version {} (current: {})", clusterStateVersion, + observer.observedState().getVersion()); + } catch (Exception e) { + logger.debug("failed waiting for cluster state with version {} (current: {})", e, clusterStateVersion, + observer.observedState()); + throw ExceptionsHelper.convertToRuntime(e); + } + } + } + class FilesInfoRequestHandler implements TransportRequestHandler { @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryWaitForClusterStateRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryWaitForClusterStateRequest.java new file mode 100644 index 00000000000..e45cf3f7d16 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryWaitForClusterStateRequest.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; + +public class RecoveryWaitForClusterStateRequest extends TransportRequest { + + private long recoveryId; + private ShardId shardId; + private long clusterStateVersion; + + public RecoveryWaitForClusterStateRequest() { + } + + RecoveryWaitForClusterStateRequest(long recoveryId, ShardId shardId, long clusterStateVersion) { + this.recoveryId = recoveryId; + this.shardId = shardId; + this.clusterStateVersion = clusterStateVersion; + } + + public long recoveryId() { + return this.recoveryId; + } + + public ShardId shardId() { + return shardId; + } + + public long clusterStateVersion() { + return clusterStateVersion; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + recoveryId = in.readLong(); + shardId = ShardId.readShardId(in); + clusterStateVersion = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(recoveryId); + shardId.writeTo(out); + out.writeVLong(clusterStateVersion); + } +} diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index edc6c520be0..32cbce55199 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -89,6 +89,14 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } + @Override + public void ensureClusterStateVersion(long clusterStateVersion) { + transportService.submitRequest(targetNode, RecoveryTargetService.Actions.WAIT_CLUSTERSTATE, + new RecoveryWaitForClusterStateRequest(recoveryId, shardId, clusterStateVersion), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } + @Override public void indexTranslogOperations(List operations, int totalTranslogOps) { final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java index 6794475e0a3..67492affafb 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java @@ -19,11 +19,14 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; import java.io.IOException; +import java.util.function.Function; +import java.util.function.Supplier; /** * A recovery handler that skips phase 1 as well as sending the snapshot. During phase 3 the shard is marked @@ -34,9 +37,10 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { private final IndexShard shard; private final StartRecoveryRequest request; - public SharedFSRecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request, ESLogger - logger) { - super(shard, recoveryTarget, request, -1, logger); + public SharedFSRecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request, + Supplier currentClusterStateVersionSupplier, + Function delayNewRecoveries, ESLogger logger) { + super(shard, recoveryTarget, request, currentClusterStateVersionSupplier, delayNewRecoveries, -1, logger); this.shard = shard; this.request = request; } diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index d7afd095d01..62a09584d84 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -139,7 +139,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { new BiFunction() { @Override public RecoveryTarget apply(IndexShard indexShard, DiscoveryNode node) { - return new RecoveryTarget(indexShard, node, recoveryListener) { + return new RecoveryTarget(indexShard, node, recoveryListener, version -> {}) { @Override public void renameAllTempFiles() throws IOException { super.renameAllTempFiles(); @@ -274,7 +274,8 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { primary.recoverFromStore(); primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry())); for (IndexShard replicaShard : replicas) { - recoverReplica(replicaShard, (replica, sourceNode) -> new RecoveryTarget(replica, sourceNode, recoveryListener)); + recoverReplica(replicaShard, + (replica, sourceNode) -> new RecoveryTarget(replica, sourceNode, recoveryListener, version -> {})); } } @@ -302,8 +303,8 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode, replica.store().getMetadataOrEmpty(), RecoveryState.Type.REPLICA, 0); - RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, (int) ByteSizeUnit.MB.toKB(1), - logger); + RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, () -> 0L, e -> () -> {}, + (int) ByteSizeUnit.MB.toKB(1), logger); recovery.recoverToTarget(); recoveryTarget.markAsDone(); replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry())); diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 815884edf51..3e26e3018b2 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -66,7 +66,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery, IndexShard shard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener, ESLogger logger) { - super(shard, sourceNode, listener); + super(shard, sourceNode, listener, version -> {}); this.recoveryBlocked = recoveryBlocked; this.releaseRecovery = releaseRecovery; this.stageToBlock = stageToBlock; diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 2c52cd33015..cfff28121ba 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -24,6 +24,7 @@ import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.store.BaseDirectoryWrapper; @@ -35,15 +36,20 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardRelocatedException; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; @@ -55,9 +61,15 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RecoverySourceHandlerTests extends ESTestCase { private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT).build()); @@ -73,8 +85,8 @@ public class RecoverySourceHandlerTests extends ESTestCase { new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT), null, RecoveryState.Type.STORE, randomLong()); Store store = newStore(createTempDir()); - RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, recoverySettings.getChunkSize().bytesAsInt(), - logger); + RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, () -> 0L, e -> () -> {}, + recoverySettings.getChunkSize().bytesAsInt(), logger); Directory dir = store.directory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig()); int numDocs = randomIntBetween(10, 100); @@ -125,7 +137,8 @@ public class RecoverySourceHandlerTests extends ESTestCase { Path tempDir = createTempDir(); Store store = newStore(tempDir, false); AtomicBoolean failedEngine = new AtomicBoolean(false); - RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, recoverySettings.getChunkSize().bytesAsInt(), logger) { + RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, () -> 0L, e -> () -> {}, + recoverySettings.getChunkSize().bytesAsInt(), logger) { @Override protected void failEngine(IOException cause) { assertFalse(failedEngine.get()); @@ -188,7 +201,8 @@ public class RecoverySourceHandlerTests extends ESTestCase { Path tempDir = createTempDir(); Store store = newStore(tempDir, false); AtomicBoolean failedEngine = new AtomicBoolean(false); - RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, recoverySettings.getChunkSize().bytesAsInt(), logger) { + RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, () -> 0L, e -> () -> {}, + recoverySettings.getChunkSize().bytesAsInt(), logger) { @Override protected void failEngine(IOException cause) { assertFalse(failedEngine.get()); @@ -237,6 +251,99 @@ public class RecoverySourceHandlerTests extends ESTestCase { IOUtils.close(store, targetStore); } + public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Completed() throws IOException { + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + StartRecoveryRequest request = new StartRecoveryRequest(shardId, + new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT), + new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT), + null, RecoveryState.Type.REPLICA, randomLong()); + IndexShard shard = mock(IndexShard.class); + Translog.View translogView = mock(Translog.View.class); + when(shard.acquireTranslogView()).thenReturn(translogView); + when(shard.state()).thenReturn(IndexShardState.RELOCATED); + AtomicBoolean phase1Called = new AtomicBoolean(); + AtomicBoolean phase2Called = new AtomicBoolean(); + RecoverySourceHandler handler = new RecoverySourceHandler(shard, null, request, () -> 0L, e -> () -> {}, + recoverySettings.getChunkSize().bytesAsInt(), logger) { + + @Override + public void phase1(final IndexCommit snapshot, final Translog.View translogView) { + phase1Called.set(true); + } + + @Override + public void phase2(Translog.Snapshot snapshot) { + phase2Called.set(true); + } + }; + expectThrows(IndexShardRelocatedException.class, () -> handler.recoverToTarget()); + assertTrue(phase1Called.get()); + assertFalse(phase2Called.get()); + } + + public void testWaitForClusterStateOnPrimaryRelocation() throws IOException, InterruptedException { + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + StartRecoveryRequest request = new StartRecoveryRequest(shardId, + new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT), + new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT), + null, RecoveryState.Type.PRIMARY_RELOCATION, randomLong()); + AtomicBoolean phase1Called = new AtomicBoolean(); + AtomicBoolean phase2Called = new AtomicBoolean(); + AtomicBoolean ensureClusterStateVersionCalled = new AtomicBoolean(); + AtomicBoolean recoveriesDelayed = new AtomicBoolean(); + AtomicBoolean relocated = new AtomicBoolean(); + + IndexShard shard = mock(IndexShard.class); + Translog.View translogView = mock(Translog.View.class); + when(shard.acquireTranslogView()).thenReturn(translogView); + when(shard.state()).then(i -> relocated.get() ? IndexShardState.RELOCATED : IndexShardState.STARTED); + doAnswer(i -> { + relocated.set(true); + assertTrue(recoveriesDelayed.get()); + return null; + }).when(shard).relocated(any(String.class)); + + RecoveryTargetHandler targetHandler = mock(RecoveryTargetHandler.class); + + final Supplier currentClusterStateVersionSupplier = () -> { + assertFalse(ensureClusterStateVersionCalled.get()); + assertTrue(recoveriesDelayed.get()); + ensureClusterStateVersionCalled.set(true); + return 0L; + }; + final Function delayNewRecoveries = s -> { + assertTrue(phase1Called.get()); + assertTrue(phase2Called.get()); + + assertFalse(recoveriesDelayed.get()); + recoveriesDelayed.set(true); + return () -> { + assertTrue(recoveriesDelayed.get()); + recoveriesDelayed.set(false); + }; + }; + + RecoverySourceHandler handler = new RecoverySourceHandler(shard, targetHandler, request, currentClusterStateVersionSupplier, + delayNewRecoveries, recoverySettings.getChunkSize().bytesAsInt(), logger) { + + @Override + public void phase1(final IndexCommit snapshot, final Translog.View translogView) { + phase1Called.set(true); + } + + @Override + public void phase2(Translog.Snapshot snapshot) { + phase2Called.set(true); + } + }; + handler.recoverToTarget(); + assertTrue(ensureClusterStateVersionCalled.get()); + assertTrue(phase1Called.get()); + assertTrue(phase2Called.get()); + assertTrue(relocated.get()); + assertFalse(recoveriesDelayed.get()); + } + private Store newStore(Path path) throws IOException { return newStore(path, true); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java index fde14c4825c..587dc35bc50 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java @@ -51,7 +51,7 @@ public class RecoveryStatusTests extends ESSingleNodeTestCase { @Override public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { } - }); + }, version -> {}); try (IndexOutput indexOutput = status.openAndPutIndexOutput("foo.bar", new StoreFileMetaData("foo.bar", 8 + CodecUtil.footerLength(), "9z51nw"), status.store())) { indexOutput.writeInt(1); IndexOutput openIndexOutput = status.getOpenIndexOutput("foo.bar"); diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java b/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java index bc323ecec8f..582be02d457 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java @@ -19,34 +19,20 @@ package org.elasticsearch.recovery; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveriesCollection; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTargetService; -import org.elasticsearch.test.ESSingleNodeTestCase; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.junit.After; -import org.junit.Before; -import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; @@ -65,7 +51,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase { public void testLastAccessTimeUpdate() throws Exception { try (ReplicationGroup shards = createGroup(0)) { - final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); + final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {}); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) { final long lastSeenTime = status.status().lastAccessTime(); @@ -82,22 +68,22 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase { public void testRecoveryTimeout() throws Exception { try (ReplicationGroup shards = createGroup(0)) { - final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); + final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {}); final AtomicBoolean failed = new AtomicBoolean(); final CountDownLatch latch = new CountDownLatch(1); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica(), - new RecoveryTargetService.RecoveryListener() { - @Override - public void onRecoveryDone(RecoveryState state) { - latch.countDown(); - } + new RecoveryTargetService.RecoveryListener() { + @Override + public void onRecoveryDone(RecoveryState state) { + latch.countDown(); + } - @Override - public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { - failed.set(true); - latch.countDown(); - } - }, TimeValue.timeValueMillis(100)); + @Override + public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { + failed.set(true); + latch.countDown(); + } + }, TimeValue.timeValueMillis(100)); try { latch.await(30, TimeUnit.SECONDS); assertTrue("recovery failed to timeout", failed.get()); @@ -110,7 +96,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase { public void testRecoveryCancellation() throws Exception { try (ReplicationGroup shards = createGroup(0)) { - final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); + final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {}); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) { @@ -129,7 +115,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase { shards.startAll(); int numDocs = randomIntBetween(1, 15); shards.indexDocs(numDocs); - final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); + final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {}); IndexShard shard = shards.addReplica(); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shard); try (RecoveriesCollection.RecoveryRef recovery = collection.getRecovery(recoveryId)) { diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 901f4717560..c95f57d20dc 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -444,7 +444,7 @@ public class RelocationIT extends ESIntegTestCase { assertAcked(prepareCreate("test").setSettings(Settings.builder() .put("index.routing.allocation.exclude.color", "blue") .put(indexSettings()) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) // NORELEASE: set to randomInt(halfNodes - 1) once replica data loss is fixed + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1)) )); assertAllShardsOnNodes("test", redFuture.get().toArray(new String[2])); int numDocs = randomIntBetween(100, 150);