Fix replica-primary inconsistencies when indexing during primary relocation with ongoing replica recoveries (#19287)

Primary relocation violates two invariants that ensure proper interaction between document replication and peer recoveries, ultimately leading to documents not being properly replicated.

Invariant 1: Document writes must be replicated based on the routing table of a cluster state that includes all shards which have ongoing or finished recoveries. This is ensured by the fact that do not start a recovery that is not reflected by the cluster state available on the primary node and we always sample a fresh cluster state before starting to replicate write operations.

Invariant 2: Every operation that is not part of the snapshot taken for phase 2, must be succesfully indexed on the target replica (pending shard level errors which will cause the target shard to be failed). To ensure this, we start replicating to the target shard as soon as the recovery start and open it's engine before we take the snapshot. All operations that are indexed after the snapshot was taken are guaranteed to arrive to the shard when it's ready to index them. Note that this also means that the replication doesn't fail a shard if it's not yet ready to recieve operations - it's a normal part of a recovering shard.

With primary relocations, the two invariants can be possibly violated. Let's consider a primary relocating while there is another replica shard recovering from the primary shard.

Invariant 1 can be violated if the target of the primary relocation is so lagging on cluster state processing that it doesn't even know about the new initializing replica. This is very rare in practice as replica recoveries take time to copy all the index files but it is a theoretical gap that surfaces in testing scenarios.

Invariant 2 can be violated even if the target primary knows about the initializing replica. This can happen if the target primary replicates an operation to the intializing shard and that operation arrives to the initializing shard before it opens it's engine but arrives to the primary source after it has taken the snapshot of the translog. Those operations will be currently missed on the new initializing replica.

The fix to reestablish invariant 1 is to ensure that the primary relocation target has a cluster state with all replica recoveries that were successfully started on primary relocation source. The fix to reestablish invariant 2 is to check after opening engine on the replica if the primary has been relocated in the meanwhile and fail the recovery.

Closes #19248
This commit is contained in:
Yannick Welsch 2016-07-19 14:07:58 +02:00 committed by GitHub
parent f79fb4ada7
commit c4fe8e7bf2
16 changed files with 429 additions and 98 deletions

View File

@ -382,8 +382,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
if (recoveryState.getSourceNode().equals(sourceNode) == false) {
if (recoveryTargetService.cancelRecoveriesForShard(shardId, "recovery source node changed")) {
// getting here means that the shard was still recovering
logger.debug("{} removing shard (recovery source changed), current [{}], global [{}])",
shardId, currentRoutingEntry, newShardRouting);
logger.debug("{} removing shard (recovery source changed), current [{}], global [{}], shard [{}])",
shardId, recoveryState.getSourceNode(), sourceNode, newShardRouting);
indexService.removeShard(shardId.id(), "removing shard (recovery source node changed)");
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShard;
@ -31,7 +32,6 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@ -48,10 +48,12 @@ public class RecoveriesCollection {
private final ESLogger logger;
private final ThreadPool threadPool;
private final Callback<Long> ensureClusterStateVersionCallback;
public RecoveriesCollection(ESLogger logger, ThreadPool threadPool) {
public RecoveriesCollection(ESLogger logger, ThreadPool threadPool, Callback<Long> ensureClusterStateVersionCallback) {
this.logger = logger;
this.threadPool = threadPool;
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;
}
/**
@ -61,7 +63,7 @@ public class RecoveriesCollection {
*/
public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode,
RecoveryTargetService.RecoveryListener listener, TimeValue activityTimeout) {
RecoveryTarget status = new RecoveryTarget(indexShard, sourceNode, listener);
RecoveryTarget status = new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback);
RecoveryTarget existingStatus = onGoingRecoveries.putIfAbsent(status.recoveryId(), status);
assert existingStatus == null : "found two RecoveryStatus instances with the same id";
logger.trace("{} started recovery from {}, id [{}]", indexShard.shardId(), sourceNode, status.recoveryId());
@ -175,7 +177,6 @@ public class RecoveriesCollection {
return cancelled;
}
/**
* a reference to {@link RecoveryTarget}, which implements {@link AutoCloseable}. closing the reference
* causes {@link RecoveryTarget#decRef()} to be called. This makes sure that the underlying resources

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexEventListener;
@ -44,6 +45,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
/**
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
@ -101,13 +103,7 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe
throw new DelayRecoveryException("source shard is not marked yet as relocating to [" + request.targetNode() + "]");
}
ShardRouting targetShardRouting = null;
for (ShardRouting shardRouting : node) {
if (shardRouting.shardId().equals(request.shardId())) {
targetShardRouting = shardRouting;
break;
}
}
ShardRouting targetShardRouting = node.getByShardId(request.shardId());
if (targetShardRouting == null) {
logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(), request.targetNode());
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
@ -118,17 +114,8 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe
throw new DelayRecoveryException("source node has the state of the target shard to be [" + targetShardRouting.state() + "], expecting to be [initializing]");
}
RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);
logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode());
final RecoverySourceHandler handler;
final RemoteRecoveryTargetHandler recoveryTarget =
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(),
recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
if (shard.indexSettings().isOnSharedFilesystem()) {
handler = new SharedFSRecoverySourceHandler(shard, recoveryTarget, request, logger);
} else {
handler = new RecoverySourceHandler(shard, recoveryTarget, request, recoverySettings.getChunkSize().bytesAsInt(), logger);
}
ongoingRecoveries.add(shard, handler);
try {
return handler.recoverToTarget();
} finally {
@ -144,38 +131,35 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe
}
}
private static final class OngoingRecoveries {
private final Map<IndexShard, Set<RecoverySourceHandler>> ongoingRecoveries = new HashMap<>();
private final class OngoingRecoveries {
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();
synchronized void add(IndexShard shard, RecoverySourceHandler handler) {
Set<RecoverySourceHandler> shardRecoveryHandlers = ongoingRecoveries.get(shard);
if (shardRecoveryHandlers == null) {
shardRecoveryHandlers = new HashSet<>();
ongoingRecoveries.put(shard, shardRecoveryHandlers);
}
assert shardRecoveryHandlers.contains(handler) == false : "Handler was already registered [" + handler + "]";
shardRecoveryHandlers.add(handler);
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext());
RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard);
shard.recoveryStats().incCurrentAsSource();
return handler;
}
synchronized void remove(IndexShard shard, RecoverySourceHandler handler) {
final Set<RecoverySourceHandler> shardRecoveryHandlers = ongoingRecoveries.get(shard);
assert shardRecoveryHandlers != null : "Shard was not registered [" + shard + "]";
boolean remove = shardRecoveryHandlers.remove(handler);
final ShardRecoveryContext shardRecoveryContext = ongoingRecoveries.get(shard);
assert shardRecoveryContext != null : "Shard was not registered [" + shard + "]";
boolean remove = shardRecoveryContext.recoveryHandlers.remove(handler);
assert remove : "Handler was not registered [" + handler + "]";
if (remove) {
shard.recoveryStats().decCurrentAsSource();
}
if (shardRecoveryHandlers.isEmpty()) {
if (shardRecoveryContext.recoveryHandlers.isEmpty()) {
ongoingRecoveries.remove(shard);
assert shardRecoveryContext.onNewRecoveryException == null;
}
}
synchronized void cancel(IndexShard shard, String reason) {
final Set<RecoverySourceHandler> shardRecoveryHandlers = ongoingRecoveries.get(shard);
if (shardRecoveryHandlers != null) {
final ShardRecoveryContext shardRecoveryContext = ongoingRecoveries.get(shard);
if (shardRecoveryContext != null) {
final List<Exception> failures = new ArrayList<>();
for (RecoverySourceHandler handlers : shardRecoveryHandlers) {
for (RecoverySourceHandler handlers : shardRecoveryContext.recoveryHandlers) {
try {
handlers.cancel(reason);
} catch (Exception ex) {
@ -187,6 +171,60 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe
ExceptionsHelper.maybeThrowRuntimeAndSuppress(failures);
}
}
private final class ShardRecoveryContext {
final Set<RecoverySourceHandler> recoveryHandlers = new HashSet<>();
@Nullable
private DelayRecoveryException onNewRecoveryException;
/**
* Adds recovery source handler if recoveries are not delayed from starting (see also {@link #delayNewRecoveries}.
* Throws {@link DelayRecoveryException} if new recoveries are delayed from starting.
*/
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
if (onNewRecoveryException != null) {
throw onNewRecoveryException;
}
RecoverySourceHandler handler = createRecoverySourceHandler(request, shard);
recoveryHandlers.add(handler);
return handler;
}
private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) {
RecoverySourceHandler handler;
final RemoteRecoveryTargetHandler recoveryTarget =
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(),
recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
Supplier<Long> currentClusterStateVersionSupplier = () -> clusterService.state().getVersion();
if (shard.indexSettings().isOnSharedFilesystem()) {
handler = new SharedFSRecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier,
this::delayNewRecoveries, logger);
} else {
handler = new RecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier,
this::delayNewRecoveries, recoverySettings.getChunkSize().bytesAsInt(), logger);
}
return handler;
}
/**
* Makes new recoveries throw a {@link DelayRecoveryException} with the provided message.
*
* Throws {@link IllegalStateException} if new recoveries are already being delayed.
*/
synchronized Releasable delayNewRecoveries(String exceptionMessage) throws IllegalStateException {
if (onNewRecoveryException != null) {
throw new IllegalStateException("already delaying recoveries");
}
onNewRecoveryException = new DelayRecoveryException(exceptionMessage);
return this::unblockNewRecoveries;
}
private synchronized void unblockNewRecoveries() {
onNewRecoveryException = null;
}
}
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -41,6 +42,7 @@ import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
@ -53,6 +55,7 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
/**
@ -75,6 +78,8 @@ public class RecoverySourceHandler {
private final int shardId;
// Request containing source and target node information
private final StartRecoveryRequest request;
private final Supplier<Long> currentClusterStateVersionSupplier;
private final Function<String, Releasable> delayNewRecoveries;
private final int chunkSizeInBytes;
private final RecoveryTargetHandler recoveryTarget;
@ -98,11 +103,15 @@ public class RecoverySourceHandler {
public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget,
final StartRecoveryRequest request,
final Supplier<Long> currentClusterStateVersionSupplier,
Function<String, Releasable> delayNewRecoveries,
final int fileChunkSizeInBytes,
final ESLogger logger) {
this.shard = shard;
this.recoveryTarget = recoveryTarget;
this.request = request;
this.currentClusterStateVersionSupplier = currentClusterStateVersionSupplier;
this.delayNewRecoveries = delayNewRecoveries;
this.logger = logger;
this.indexName = this.request.shardId().getIndex().getName();
this.shardId = this.request.shardId().id();
@ -136,6 +145,22 @@ public class RecoverySourceHandler {
}
}
// engine was just started at the end of phase 1
if (shard.state() == IndexShardState.RELOCATED) {
/**
* The primary shard has been relocated while we copied files. This means that we can't guarantee any more that all
* operations that were replicated during the file copy (when the target engine was not yet opened) will be present in the
* local translog and thus will be resent on phase 2. The reason is that an operation replicated by the target primary is
* sent to the recovery target and the local shard (old primary) concurrently, meaning it may have arrived at the recovery
* target before we opened the engine and is still in-flight on the local shard.
*
* Checking the relocated status here, after we opened the engine on the target, is safe because primary relocation waits
* for all ongoing operations to complete and be fully replicated. Therefore all future operation by the new primary are
* guaranteed to reach the target shard when it's engine is open.
*/
throw new IndexShardRelocatedException(request.shardId());
}
logger.trace("{} snapshot translog for recovery. current size is [{}]", shard.shardId(), translogView.totalOperations());
try {
phase2(translogView.snapshot());
@ -362,12 +387,18 @@ public class RecoverySourceHandler {
cancellableThreads.execute(recoveryTarget::finalizeRecovery);
if (isPrimaryRelocation()) {
// in case of primary relocation we have to ensure that the cluster state on the primary relocation target has all
// replica shards that have recovered or are still recovering from the current primary, otherwise replication actions
// will not be send to these replicas. To accomplish this, first block new recoveries, then take version of latest cluster
// state. This means that no new recovery can be completed based on information of a newer cluster state than the current one.
try (Releasable ignored = delayNewRecoveries.apply("primary relocation hand-off in progress or completed for " + shardId)) {
final long currentClusterStateVersion = currentClusterStateVersionSupplier.get();
logger.trace("[{}][{}] waiting on {} to have cluster state with version [{}]", indexName, shardId, request.targetNode(),
currentClusterStateVersion);
cancellableThreads.execute(() -> recoveryTarget.ensureClusterStateVersion(currentClusterStateVersion));
logger.trace("[{}][{}] performing relocation hand-off to {}", indexName, shardId, request.targetNode());
try {
cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode()));
} catch (Exception e) {
logger.debug("[{}][{}] completing relocation hand-off to {} failed", e, indexName, shardId, request.targetNode());
throw e;
}
/**
* if the recovery process fails after setting the shard state to RELOCATED, both relocation source and

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -56,10 +57,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
*
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
* this class are created through {@link RecoveriesCollection}.
*/
public class RecoveryTarget extends AbstractRefCounted implements RecoveryTargetHandler {
private final ESLogger logger;
@ -75,6 +75,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final String tempFilePrefix;
private final Store store;
private final RecoveryTargetService.RecoveryListener listener;
private final Callback<Long> ensureClusterStateVersionCallback;
private final AtomicBoolean finished = new AtomicBoolean();
@ -87,15 +88,26 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();
private RecoveryTarget(RecoveryTarget copyFrom) { // copy constructor
this(copyFrom.indexShard, copyFrom.sourceNode, copyFrom.listener, copyFrom.cancellableThreads, copyFrom.recoveryId);
this(copyFrom.indexShard, copyFrom.sourceNode, copyFrom.listener, copyFrom.cancellableThreads, copyFrom.recoveryId,
copyFrom.ensureClusterStateVersionCallback);
}
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener) {
this(indexShard, sourceNode, listener, new CancellableThreads(), idGenerator.incrementAndGet());
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener,
Callback<Long> ensureClusterStateVersionCallback) {
this(indexShard, sourceNode, listener, new CancellableThreads(), idGenerator.incrementAndGet(), ensureClusterStateVersionCallback);
}
/**
* creates a new recovery target object that represents a recovery to the provided indexShard
*
* @param indexShard local shard where we want to recover to
* @param sourceNode source node of the recovery where we recover from
* @param listener called when recovery is completed / failed
* @param ensureClusterStateVersionCallback callback to ensure that the current node is at least on a cluster state with the provided
* version. Necessary for primary relocation so that new primary knows about all other ongoing
* replica recoveries when replicating documents (see {@link RecoverySourceHandler}).
*/
private RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener,
CancellableThreads cancellableThreads, long recoveryId) {
CancellableThreads cancellableThreads, long recoveryId, Callback<Long> ensureClusterStateVersionCallback) {
super("recovery_status");
this.cancellableThreads = cancellableThreads;
this.recoveryId = recoveryId;
@ -106,6 +118,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
this.shardId = indexShard.shardId();
this.tempFilePrefix = RECOVERY_PREFIX + UUIDs.base64UUID() + ".";
this.store = indexShard.store();
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;
// make sure the store is not released until we are done.
store.incRef();
indexShard.recoveryStats().incCurrentAsTarget();
@ -321,6 +334,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
indexShard().finalizeRecovery();
}
@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
ensureClusterStateVersionCallback.handle(clusterStateVersion);
}
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws TranslogRecoveryPerformer
.BatchOperationException {

View File

@ -43,6 +43,11 @@ public interface RecoveryTargetHandler {
**/
void finalizeRecovery();
/**
* Blockingly waits for cluster state with at least clusterStateVersion to be available
*/
void ensureClusterStateVersion(long clusterStateVersion);
/**
* Index a set of translog operations on the target
* @param operations operations to index

View File

@ -24,6 +24,7 @@ import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -42,11 +43,11 @@ import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.FutureTransportResponseHandler;
@ -76,6 +77,7 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops";
public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
public static final String FINALIZE = "internal:index/shard/recovery/finalize";
public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate";
}
private final ThreadPool threadPool;
@ -95,7 +97,7 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool, this::waitForClusterState);
transportService.registerRequestHandler(Actions.FILES_INFO, RecoveryFilesInfoRequest::new, ThreadPool.Names.GENERIC, new
FilesInfoRequestHandler());
@ -109,6 +111,8 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
FinalizeRecoveryRequestHandler());
transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new,
ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler());
}
@Override
@ -301,6 +305,18 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
}
}
class WaitForClusterStateRequestHandler implements TransportRequestHandler<RecoveryWaitForClusterStateRequest> {
@Override
public void messageReceived(RecoveryWaitForClusterStateRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.status().ensureClusterStateVersion(request.clusterStateVersion());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> {
@Override
@ -362,6 +378,52 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
}
}
private void waitForClusterState(long clusterStateVersion) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, TimeValue.timeValueMinutes(5), logger,
threadPool.getThreadContext());
final ClusterState clusterState = observer.observedState();
if (clusterState.getVersion() >= clusterStateVersion) {
logger.trace("node has cluster state with version higher than {} (current: {})", clusterStateVersion,
clusterState.getVersion());
return;
} else {
logger.trace("waiting for cluster state version {} (current: {})", clusterStateVersion, clusterState.getVersion());
final PlainActionFuture<Void> future = new PlainActionFuture<>();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
future.onResponse(null);
}
@Override
public void onClusterServiceClose() {
future.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
future.onFailure(new IllegalStateException("cluster state never updated to version " + clusterStateVersion));
}
}, new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
return newState.getVersion() >= clusterStateVersion;
}
});
try {
future.get();
logger.trace("successfully waited for cluster state with version {} (current: {})", clusterStateVersion,
observer.observedState().getVersion());
} catch (Exception e) {
logger.debug("failed waiting for cluster state with version {} (current: {})", e, clusterStateVersion,
observer.observedState());
throw ExceptionsHelper.convertToRuntime(e);
}
}
}
class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesInfoRequest> {
@Override

View File

@ -0,0 +1,70 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
public class RecoveryWaitForClusterStateRequest extends TransportRequest {
private long recoveryId;
private ShardId shardId;
private long clusterStateVersion;
public RecoveryWaitForClusterStateRequest() {
}
RecoveryWaitForClusterStateRequest(long recoveryId, ShardId shardId, long clusterStateVersion) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.clusterStateVersion = clusterStateVersion;
}
public long recoveryId() {
return this.recoveryId;
}
public ShardId shardId() {
return shardId;
}
public long clusterStateVersion() {
return clusterStateVersion;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
clusterStateVersion = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeVLong(clusterStateVersion);
}
}

View File

@ -89,6 +89,14 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
transportService.submitRequest(targetNode, RecoveryTargetService.Actions.WAIT_CLUSTERSTATE,
new RecoveryWaitForClusterStateRequest(recoveryId, shardId, clusterStateVersion),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(

View File

@ -19,11 +19,14 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* A recovery handler that skips phase 1 as well as sending the snapshot. During phase 3 the shard is marked
@ -34,9 +37,10 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
private final IndexShard shard;
private final StartRecoveryRequest request;
public SharedFSRecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request, ESLogger
logger) {
super(shard, recoveryTarget, request, -1, logger);
public SharedFSRecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request,
Supplier<Long> currentClusterStateVersionSupplier,
Function<String, Releasable> delayNewRecoveries, ESLogger logger) {
super(shard, recoveryTarget, request, currentClusterStateVersionSupplier, delayNewRecoveries, -1, logger);
this.shard = shard;
this.request = request;
}

View File

@ -139,7 +139,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
new BiFunction<IndexShard, DiscoveryNode, RecoveryTarget>() {
@Override
public RecoveryTarget apply(IndexShard indexShard, DiscoveryNode node) {
return new RecoveryTarget(indexShard, node, recoveryListener) {
return new RecoveryTarget(indexShard, node, recoveryListener, version -> {}) {
@Override
public void renameAllTempFiles() throws IOException {
super.renameAllTempFiles();
@ -274,7 +274,8 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
primary.recoverFromStore();
primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry()));
for (IndexShard replicaShard : replicas) {
recoverReplica(replicaShard, (replica, sourceNode) -> new RecoveryTarget(replica, sourceNode, recoveryListener));
recoverReplica(replicaShard,
(replica, sourceNode) -> new RecoveryTarget(replica, sourceNode, recoveryListener, version -> {}));
}
}
@ -302,8 +303,8 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode);
StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode,
replica.store().getMetadataOrEmpty(), RecoveryState.Type.REPLICA, 0);
RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, (int) ByteSizeUnit.MB.toKB(1),
logger);
RecoverySourceHandler recovery = new RecoverySourceHandler(primary, recoveryTarget, request, () -> 0L, e -> () -> {},
(int) ByteSizeUnit.MB.toKB(1), logger);
recovery.recoverToTarget();
recoveryTarget.markAsDone();
replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry()));

View File

@ -66,7 +66,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery, IndexShard shard,
DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener, ESLogger logger) {
super(shard, sourceNode, listener);
super(shard, sourceNode, listener, version -> {});
this.recoveryBlocked = recoveryBlocked;
this.releaseRecovery = releaseRecovery;
this.stageToBlock = stageToBlock;

View File

@ -24,6 +24,7 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.store.BaseDirectoryWrapper;
@ -35,15 +36,20 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
@ -55,9 +61,15 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class RecoverySourceHandlerTests extends ESTestCase {
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT).build());
@ -73,8 +85,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT),
null, RecoveryState.Type.STORE, randomLong());
Store store = newStore(createTempDir());
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, recoverySettings.getChunkSize().bytesAsInt(),
logger);
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, () -> 0L, e -> () -> {},
recoverySettings.getChunkSize().bytesAsInt(), logger);
Directory dir = store.directory();
RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
int numDocs = randomIntBetween(10, 100);
@ -125,7 +137,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
Path tempDir = createTempDir();
Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false);
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, recoverySettings.getChunkSize().bytesAsInt(), logger) {
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, () -> 0L, e -> () -> {},
recoverySettings.getChunkSize().bytesAsInt(), logger) {
@Override
protected void failEngine(IOException cause) {
assertFalse(failedEngine.get());
@ -188,7 +201,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
Path tempDir = createTempDir();
Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false);
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, recoverySettings.getChunkSize().bytesAsInt(), logger) {
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request, () -> 0L, e -> () -> {},
recoverySettings.getChunkSize().bytesAsInt(), logger) {
@Override
protected void failEngine(IOException cause) {
assertFalse(failedEngine.get());
@ -237,6 +251,99 @@ public class RecoverySourceHandlerTests extends ESTestCase {
IOUtils.close(store, targetStore);
}
public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Completed() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT),
null, RecoveryState.Type.REPLICA, randomLong());
IndexShard shard = mock(IndexShard.class);
Translog.View translogView = mock(Translog.View.class);
when(shard.acquireTranslogView()).thenReturn(translogView);
when(shard.state()).thenReturn(IndexShardState.RELOCATED);
AtomicBoolean phase1Called = new AtomicBoolean();
AtomicBoolean phase2Called = new AtomicBoolean();
RecoverySourceHandler handler = new RecoverySourceHandler(shard, null, request, () -> 0L, e -> () -> {},
recoverySettings.getChunkSize().bytesAsInt(), logger) {
@Override
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
phase1Called.set(true);
}
@Override
public void phase2(Translog.Snapshot snapshot) {
phase2Called.set(true);
}
};
expectThrows(IndexShardRelocatedException.class, () -> handler.recoverToTarget());
assertTrue(phase1Called.get());
assertFalse(phase2Called.get());
}
public void testWaitForClusterStateOnPrimaryRelocation() throws IOException, InterruptedException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT),
null, RecoveryState.Type.PRIMARY_RELOCATION, randomLong());
AtomicBoolean phase1Called = new AtomicBoolean();
AtomicBoolean phase2Called = new AtomicBoolean();
AtomicBoolean ensureClusterStateVersionCalled = new AtomicBoolean();
AtomicBoolean recoveriesDelayed = new AtomicBoolean();
AtomicBoolean relocated = new AtomicBoolean();
IndexShard shard = mock(IndexShard.class);
Translog.View translogView = mock(Translog.View.class);
when(shard.acquireTranslogView()).thenReturn(translogView);
when(shard.state()).then(i -> relocated.get() ? IndexShardState.RELOCATED : IndexShardState.STARTED);
doAnswer(i -> {
relocated.set(true);
assertTrue(recoveriesDelayed.get());
return null;
}).when(shard).relocated(any(String.class));
RecoveryTargetHandler targetHandler = mock(RecoveryTargetHandler.class);
final Supplier<Long> currentClusterStateVersionSupplier = () -> {
assertFalse(ensureClusterStateVersionCalled.get());
assertTrue(recoveriesDelayed.get());
ensureClusterStateVersionCalled.set(true);
return 0L;
};
final Function<String, Releasable> delayNewRecoveries = s -> {
assertTrue(phase1Called.get());
assertTrue(phase2Called.get());
assertFalse(recoveriesDelayed.get());
recoveriesDelayed.set(true);
return () -> {
assertTrue(recoveriesDelayed.get());
recoveriesDelayed.set(false);
};
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, targetHandler, request, currentClusterStateVersionSupplier,
delayNewRecoveries, recoverySettings.getChunkSize().bytesAsInt(), logger) {
@Override
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
phase1Called.set(true);
}
@Override
public void phase2(Translog.Snapshot snapshot) {
phase2Called.set(true);
}
};
handler.recoverToTarget();
assertTrue(ensureClusterStateVersionCalled.get());
assertTrue(phase1Called.get());
assertTrue(phase2Called.get());
assertTrue(relocated.get());
assertFalse(recoveriesDelayed.get());
}
private Store newStore(Path path) throws IOException {
return newStore(path, true);
}

View File

@ -51,7 +51,7 @@ public class RecoveryStatusTests extends ESSingleNodeTestCase {
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
}
});
}, version -> {});
try (IndexOutput indexOutput = status.openAndPutIndexOutput("foo.bar", new StoreFileMetaData("foo.bar", 8 + CodecUtil.footerLength(), "9z51nw"), status.store())) {
indexOutput.writeInt(1);
IndexOutput openIndexOutput = status.getOpenIndexOutput("foo.bar");

View File

@ -19,34 +19,20 @@
package org.elasticsearch.recovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveriesCollection;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
@ -65,7 +51,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
public void testLastAccessTimeUpdate() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {});
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) {
final long lastSeenTime = status.status().lastAccessTime();
@ -82,7 +68,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
public void testRecoveryTimeout() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {});
final AtomicBoolean failed = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica(),
@ -110,7 +96,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
public void testRecoveryCancellation() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {});
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) {
@ -129,7 +115,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
shards.startAll();
int numDocs = randomIntBetween(1, 15);
shards.indexDocs(numDocs);
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {});
IndexShard shard = shards.addReplica();
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shard);
try (RecoveriesCollection.RecoveryRef recovery = collection.getRecovery(recoveryId)) {

View File

@ -444,7 +444,7 @@ public class RelocationIT extends ESIntegTestCase {
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put("index.routing.allocation.exclude.color", "blue")
.put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) // NORELEASE: set to randomInt(halfNodes - 1) once replica data loss is fixed
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1))
));
assertAllShardsOnNodes("test", redFuture.get().toArray(new String[2]));
int numDocs = randomIntBetween(100, 150);