Add proper handoff between old and new copy of relocating primary shard

When primary relocation completes, a cluster state is propagated that deactivates the old primary and marks the new primary as active.
As cluster state changes are not applied synchronously on all nodes, there can be a time interval where the relocation target has processed
the cluster state and believes to be the active primary and the relocation source has not yet processed the cluster state update and
still believes itself to be the active primary. This commit ensures that, before completing the relocation, the reloction source deactivates
writing to its store and delegates requests to the relocation target.

Closes #15900
This commit is contained in:
Yannick Welsch 2016-02-01 11:11:57 +01:00
parent e1006ea400
commit 10b5ffcda5
28 changed files with 576 additions and 235 deletions

View File

@ -58,7 +58,7 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
} }
@Override @Override
protected Tuple<ReplicationResponse, ShardFlushRequest> shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) throws Throwable { protected Tuple<ReplicationResponse, ShardFlushRequest> shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id()); IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
indexShard.flush(shardRequest.getRequest()); indexShard.flush(shardRequest.getRequest());
logger.trace("{} flush request executed on primary", indexShard.shardId()); logger.trace("{} flush request executed on primary", indexShard.shardId());

View File

@ -60,7 +60,7 @@ public class TransportShardRefreshAction extends TransportReplicationAction<Basi
} }
@Override @Override
protected Tuple<ReplicationResponse, BasicReplicationRequest> shardOperationOnPrimary(MetaData metaData, BasicReplicationRequest shardRequest) throws Throwable { protected Tuple<ReplicationResponse, BasicReplicationRequest> shardOperationOnPrimary(MetaData metaData, BasicReplicationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id()); IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
indexShard.refresh("api"); indexShard.refresh("api");
logger.trace("{} refresh request executed on primary", indexShard.shardId()); logger.trace("{} refresh request executed on primary", indexShard.shardId());

View File

@ -140,7 +140,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
} }
@Override @Override
protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Throwable { protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Exception {
// validate, if routing is required, that we got routing // validate, if routing is required, that we got routing
IndexMetaData indexMetaData = metaData.index(request.shardId().getIndex()); IndexMetaData indexMetaData = metaData.index(request.shardId().getIndex());
@ -200,7 +200,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
* Execute the given {@link IndexRequest} on a primary shard, throwing a * Execute the given {@link IndexRequest} on a primary shard, throwing a
* {@link RetryOnPrimaryException} if the operation needs to be re-tried. * {@link RetryOnPrimaryException} if the operation needs to be re-tried.
*/ */
public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Throwable { public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Exception {
Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard); Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = indexShard.shardId(); final ShardId shardId = indexShard.shardId();

View File

@ -56,6 +56,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
@ -156,10 +157,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
/** /**
* Primary operation on node with primary copy, the provided metadata should be used for request validation if needed * Primary operation on node with primary copy, the provided metadata should be used for request validation if needed
*
* @return A tuple containing not null values, as first value the result of the primary operation and as second value * @return A tuple containing not null values, as first value the result of the primary operation and as second value
* the request to be executed on the replica shards. * the request to be executed on the replica shards.
*/ */
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable; protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception;
/** /**
* Replica operation on nodes with replica copies * Replica operation on nodes with replica copies
@ -299,7 +301,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
setShard(shardId); setShard(shardId);
} }
public RetryOnReplicaException(StreamInput in) throws IOException{ public RetryOnReplicaException(StreamInput in) throws IOException {
super(in); super(in);
} }
} }
@ -326,8 +328,8 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
public void onNewClusterState(ClusterState state) { public void onNewClusterState(ClusterState state) {
context.close(); context.close();
// Forking a thread on local node via transport service so that custom transport service have an // Forking a thread on local node via transport service so that custom transport service have an
// opportunity to execute custom logic before the replica operation begins // opportunity to execute custom logic before the replica operation begins
String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]"; String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]";
TransportChannelResponseHandler<TransportResponse.Empty> handler = TransportChannelResponseHandler.emptyResponseHandler(logger, channel, extraMessage); TransportChannelResponseHandler<TransportResponse.Empty> handler = TransportChannelResponseHandler.emptyResponseHandler(logger, channel, extraMessage);
transportService.sendRequest(clusterService.localNode(), transportReplicaAction, request, handler); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, request, handler);
} }
@ -352,6 +354,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
} }
} }
} }
private void failReplicaIfNeeded(Throwable t) { private void failReplicaIfNeeded(Throwable t) {
String index = request.shardId().getIndex().getName(); String index = request.shardId().getIndex().getName();
int shardId = request.shardId().id(); int shardId = request.shardId().id();
@ -383,7 +386,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
assert request.shardId() != null : "request shardId must be set"; assert request.shardId() != null : "request shardId must be set";
try (Releasable ignored = getIndexShardOperationsCounter(request.shardId())) { try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId())) {
shardOperationOnReplica(request); shardOperationOnReplica(request);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request); logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request);
@ -399,7 +402,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
setShard(shardId); setShard(shardId);
} }
public RetryOnPrimaryException(StreamInput in) throws IOException{ public RetryOnPrimaryException(StreamInput in) throws IOException {
super(in); super(in);
} }
} }
@ -445,6 +448,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
handleBlockException(blockException); handleBlockException(blockException);
return; return;
} }
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
resolveRequest(state.metaData(), concreteIndex, request); resolveRequest(state.metaData(), concreteIndex, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest"; assert request.shardId() != null : "request shardId must be set in resolveRequest";
@ -584,60 +588,71 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
} }
/** /**
* Responsible for performing primary operation locally and delegating to replication action once successful * Responsible for performing primary operation locally or delegating primary operation to relocation target in case where shard has
* been marked as RELOCATED. Delegates to replication action once successful.
* <p> * <p>
* Note that as soon as we move to replication action, state responsibility is transferred to {@link ReplicationPhase}. * Note that as soon as we move to replication action, state responsibility is transferred to {@link ReplicationPhase}.
*/ */
final class PrimaryPhase extends AbstractRunnable { class PrimaryPhase extends AbstractRunnable {
private final Request request; private final Request request;
private final ShardId shardId;
private final TransportChannel channel; private final TransportChannel channel;
private final ClusterState state; private final ClusterState state;
private final AtomicBoolean finished = new AtomicBoolean(); private final AtomicBoolean finished = new AtomicBoolean();
private Releasable indexShardReference; private IndexShardReference indexShardReference;
PrimaryPhase(Request request, TransportChannel channel) { PrimaryPhase(Request request, TransportChannel channel) {
this.state = clusterService.state(); this.state = clusterService.state();
this.request = request; this.request = request;
assert request.shardId() != null : "request shardId must be set prior to primary phase";
this.shardId = request.shardId();
this.channel = channel; this.channel = channel;
} }
@Override @Override
public void onFailure(Throwable e) { public void onFailure(Throwable e) {
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
if (logger.isTraceEnabled()) {
logger.trace("failed to execute [{}] on [{}]", e, request, shardId);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("failed to execute [{}] on [{}]", e, request, shardId);
}
}
finishAsFailed(e); finishAsFailed(e);
} }
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
// request shardID was set in ReroutePhase // request shardID was set in ReroutePhase
assert request.shardId() != null : "request shardID must be set prior to primary phase";
final ShardId shardId = request.shardId();
final String writeConsistencyFailure = checkWriteConsistency(shardId); final String writeConsistencyFailure = checkWriteConsistency(shardId);
if (writeConsistencyFailure != null) { if (writeConsistencyFailure != null) {
finishBecauseUnavailable(shardId, writeConsistencyFailure); finishBecauseUnavailable(shardId, writeConsistencyFailure);
return; return;
} }
final ReplicationPhase replicationPhase; // closed in finishAsFailed(e) in the case of error
try { indexShardReference = getIndexShardReferenceOnPrimary(shardId);
indexShardReference = getIndexShardOperationsCounter(shardId); if (indexShardReference.isRelocated() == false) {
// execute locally
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request); Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version()); logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
} }
replicationPhase = new ReplicationPhase(primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference); ReplicationPhase replicationPhase = new ReplicationPhase(primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference);
} catch (Throwable e) { finishAndMoveToReplication(replicationPhase);
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { } else {
if (logger.isTraceEnabled()) { // delegate primary phase to relocation target
logger.trace("failed to execute [{}] on [{}]", e, request, shardId); // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
} // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
} else { final ShardRouting primary = indexShardReference.routingEntry();
if (logger.isDebugEnabled()) { indexShardReference.close();
logger.debug("failed to execute [{}] on [{}]", e, request, shardId); assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
} DiscoveryNode relocatingNode = state.nodes().get(primary.relocatingNodeId());
} transportService.sendRequest(relocatingNode, transportPrimaryAction, request, transportOptions,
finishAsFailed(e); TransportChannelResponseHandler.responseHandler(logger, TransportReplicationAction.this::newResponseInstance, channel,
return; "rerouting indexing to target primary " + primary));
} }
finishAndMoveToReplication(replicationPhase);
} }
/** /**
@ -721,10 +736,24 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
} }
} }
protected Releasable getIndexShardOperationsCounter(ShardId shardId) { /**
* returns a new reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationPhase}).
*/
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id()); IndexShard indexShard = indexService.getShard(shardId.id());
return new IndexShardReference(indexShard); return new IndexShardReferenceImpl(indexShard, true);
}
/**
* returns a new reference to {@link IndexShard} on a node that the request is replicated to. The reference is closed as soon as
* replication is completed on the node.
*/
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
return new IndexShardReferenceImpl(indexShard, false);
} }
/** /**
@ -777,17 +806,20 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
int numberOfIgnoredShardInstances = 0; int numberOfIgnoredShardInstances = 0;
int numberOfPendingShardInstances = 0; int numberOfPendingShardInstances = 0;
for (ShardRouting shard : shards) { for (ShardRouting shard : shards) {
// the following logic to select the shards to replicate to is mirrored and explained in the doRun method below
if (shard.primary() == false && executeOnReplica == false) { if (shard.primary() == false && executeOnReplica == false) {
numberOfIgnoredShardInstances++; numberOfIgnoredShardInstances++;
} else if (shard.unassigned()) { continue;
}
if (shard.unassigned()) {
numberOfIgnoredShardInstances++; numberOfIgnoredShardInstances++;
} else { continue;
if (shard.currentNodeId().equals(nodes.localNodeId()) == false) { }
numberOfPendingShardInstances++; if (nodes.localNodeId().equals(shard.currentNodeId()) == false) {
} numberOfPendingShardInstances++;
if (shard.relocating()) { }
numberOfPendingShardInstances++; if (shard.relocating() && nodes.localNodeId().equals(shard.relocatingNodeId()) == false) {
} numberOfPendingShardInstances++;
} }
} }
// one for the local primary copy // one for the local primary copy
@ -795,7 +827,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
this.pending = new AtomicInteger(numberOfPendingShardInstances); this.pending = new AtomicInteger(numberOfPendingShardInstances);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("replication phase started. pending [{}], action [{}], request [{}], cluster state version used [{}]", pending.get(), logger.trace("replication phase started. pending [{}], action [{}], request [{}], cluster state version used [{}]", pending.get(),
transportReplicaAction, replicaRequest, state.version()); transportReplicaAction, replicaRequest, state.version());
} }
} }
@ -860,7 +892,8 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
performOnReplica(shard); performOnReplica(shard);
} }
// send operation to relocating shard // send operation to relocating shard
if (shard.relocating()) { // local shard can be a relocation target of a primary that is in relocated state
if (shard.relocating() && nodes.localNodeId().equals(shard.relocatingNodeId()) == false) {
performOnReplica(shard.buildTargetRelocatingShard()); performOnReplica(shard.buildTargetRelocatingShard());
} }
} }
@ -898,22 +931,22 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node); String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node);
logger.warn("[{}] {}", exp, shardId, message); logger.warn("[{}] {}", exp, shardId, message);
shardStateAction.shardFailed( shardStateAction.shardFailed(
shard, shard,
indexUUID, indexUUID,
message, message,
exp, exp,
new ShardStateAction.Listener() { new ShardStateAction.Listener() {
@Override @Override
public void onSuccess() { public void onSuccess() {
onReplicaFailure(nodeId, exp); onReplicaFailure(nodeId, exp);
} }
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
// TODO: handle catastrophic non-channel failures // TODO: handle catastrophic non-channel failures
onReplicaFailure(nodeId, exp); onReplicaFailure(nodeId, exp);
}
} }
}
); );
} }
} }
@ -993,21 +1026,39 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
return IndexMetaData.isIndexUsingShadowReplicas(settings) == false; return IndexMetaData.isIndexUsingShadowReplicas(settings) == false;
} }
static class IndexShardReference implements Releasable { interface IndexShardReference extends Releasable {
boolean isRelocated();
final private IndexShard counter; ShardRouting routingEntry();
private final AtomicBoolean closed = new AtomicBoolean(); }
IndexShardReference(IndexShard counter) { static final class IndexShardReferenceImpl implements IndexShardReference {
counter.incrementOperationCounter();
this.counter = counter; private final IndexShard indexShard;
private final Releasable operationLock;
IndexShardReferenceImpl(IndexShard indexShard, boolean primaryAction) {
this.indexShard = indexShard;
if (primaryAction) {
operationLock = indexShard.acquirePrimaryOperationLock();
} else {
operationLock = indexShard.acquireReplicaOperationLock();
}
} }
@Override @Override
public void close() { public void close() {
if (closed.compareAndSet(false, true)) { operationLock.close();
counter.decrementOperationCounter(); }
}
@Override
public boolean isRelocated() {
return indexShard.state() == IndexShardState.RELOCATED;
}
@Override
public ShardRouting routingEntry() {
return indexShard.routingEntry();
} }
} }

View File

@ -94,7 +94,7 @@ public class MappingUpdatedAction extends AbstractComponent {
} }
} }
public void updateMappingOnMasterAsynchronously(String index, String type, Mapping mappingUpdate) throws Throwable { public void updateMappingOnMasterAsynchronously(String index, String type, Mapping mappingUpdate) throws Exception {
updateMappingOnMaster(index, type, mappingUpdate, dynamicMappingUpdateTimeout, null); updateMappingOnMaster(index, type, mappingUpdate, dynamicMappingUpdateTimeout, null);
} }
@ -102,7 +102,7 @@ public class MappingUpdatedAction extends AbstractComponent {
* Same as {@link #updateMappingOnMasterSynchronously(String, String, Mapping, TimeValue)} * Same as {@link #updateMappingOnMasterSynchronously(String, String, Mapping, TimeValue)}
* using the default timeout. * using the default timeout.
*/ */
public void updateMappingOnMasterSynchronously(String index, String type, Mapping mappingUpdate) throws Throwable { public void updateMappingOnMasterSynchronously(String index, String type, Mapping mappingUpdate) throws Exception {
updateMappingOnMasterSynchronously(index, type, mappingUpdate, dynamicMappingUpdateTimeout); updateMappingOnMasterSynchronously(index, type, mappingUpdate, dynamicMappingUpdateTimeout);
} }
@ -111,7 +111,7 @@ public class MappingUpdatedAction extends AbstractComponent {
* {@code timeout}. When this method returns successfully mappings have * {@code timeout}. When this method returns successfully mappings have
* been applied to the master node and propagated to data nodes. * been applied to the master node and propagated to data nodes.
*/ */
public void updateMappingOnMasterSynchronously(String index, String type, Mapping mappingUpdate, TimeValue timeout) throws Throwable { public void updateMappingOnMasterSynchronously(String index, String type, Mapping mappingUpdate, TimeValue timeout) throws Exception {
if (updateMappingRequest(index, type, mappingUpdate, timeout).get().isAcknowledged() == false) { if (updateMappingRequest(index, type, mappingUpdate, timeout).get().isAcknowledged() == false) {
throw new TimeoutException("Failed to acknowledge mapping update within [" + timeout + "]"); throw new TimeoutException("Failed to acknowledge mapping update within [" + timeout + "]");
} }

View File

@ -42,17 +42,17 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.SuspendableRefContainer;
import org.elasticsearch.gateway.MetaDataStateFormat; import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
@ -189,9 +189,17 @@ public class IndexShard extends AbstractIndexShardComponent {
private final ShardPath path; private final ShardPath path;
private final IndexShardOperationCounter indexShardOperationCounter; private final SuspendableRefContainer suspendableRefContainer;
private final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); private static final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
// for primaries, we only allow to write when actually started (so the cluster has decided we started)
// in case we have a relocation of a primary, we also allow to write after phase 2 completed, where the shard may be
// in state RECOVERING or POST_RECOVERY. After a primary has been marked as RELOCATED, we only allow writes to the relocation target
// which can be either in POST_RECOVERY or already STARTED (this prevents writing concurrently to two primaries).
public static final EnumSet<IndexShardState> writeAllowedStatesForPrimary = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED);
// replication is also allowed while recovering, since we index also during recovery to replicas and rely on version checks to make sure its consistent
// a relocated shard can also be target of a replication if the relocation target has not been marked as active yet and is syncing it's changes back to the relocation source
private static final EnumSet<IndexShardState> writeAllowedStatesForReplica = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
private final IndexSearcherWrapper searcherWrapper; private final IndexSearcherWrapper searcherWrapper;
@ -250,7 +258,7 @@ public class IndexShard extends AbstractIndexShardComponent {
} }
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy); this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId); this.suspendableRefContainer = new SuspendableRefContainer();
this.provider = provider; this.provider = provider;
this.searcherWrapper = indexSearcherWrapper; this.searcherWrapper = indexSearcherWrapper;
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, newQueryShardContext()); this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, newQueryShardContext());
@ -321,6 +329,8 @@ public class IndexShard extends AbstractIndexShardComponent {
* Updates the shards routing entry. This mutate the shards internal state depending * Updates the shards routing entry. This mutate the shards internal state depending
* on the changes that get introduced by the new routing value. This method will persist shard level metadata * on the changes that get introduced by the new routing value. This method will persist shard level metadata
* unless explicitly disabled. * unless explicitly disabled.
*
* @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
*/ */
public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) { public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) {
final ShardRouting currentRouting = this.shardRouting; final ShardRouting currentRouting = this.shardRouting;
@ -368,6 +378,14 @@ public class IndexShard extends AbstractIndexShardComponent {
} }
} }
} }
if (state == IndexShardState.RELOCATED &&
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
// active primaries.
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
}
this.shardRouting = newRouting; this.shardRouting = newRouting;
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting); indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
} finally { } finally {
@ -404,12 +422,16 @@ public class IndexShard extends AbstractIndexShardComponent {
} }
public IndexShard relocated(String reason) throws IndexShardNotStartedException { public IndexShard relocated(String reason) throws IndexShardNotStartedException {
synchronized (mutex) { try (Releasable block = suspendableRefContainer.blockAcquisition()) {
if (state != IndexShardState.STARTED) { // no shard operation locks are being held here, move state from started to relocated
throw new IndexShardNotStartedException(shardId, state); synchronized (mutex) {
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
}
changeState(IndexShardState.RELOCATED, reason);
} }
changeState(IndexShardState.RELOCATED, reason);
} }
return this; return this;
} }
@ -796,7 +818,6 @@ public class IndexShard extends AbstractIndexShardComponent {
refreshScheduledFuture = null; refreshScheduledFuture = null;
} }
changeState(IndexShardState.CLOSED, reason); changeState(IndexShardState.CLOSED, reason);
indexShardOperationCounter.decRef();
} finally { } finally {
final Engine engine = this.currentEngineReference.getAndSet(null); final Engine engine = this.currentEngineReference.getAndSet(null);
try { try {
@ -810,7 +831,6 @@ public class IndexShard extends AbstractIndexShardComponent {
} }
} }
public IndexShard postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { public IndexShard postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) { if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
refresh("percolator_load_queries"); refresh("percolator_load_queries");
@ -967,16 +987,17 @@ public class IndexShard extends AbstractIndexShardComponent {
IndexShardState state = this.state; // one time volatile read IndexShardState state = this.state; // one time volatile read
if (origin == Engine.Operation.Origin.PRIMARY) { if (origin == Engine.Operation.Origin.PRIMARY) {
// for primaries, we only allow to write when actually started (so the cluster has decided we started) if (writeAllowedStatesForPrimary.contains(state) == false) {
// otherwise, we need to retry, we also want to still allow to index if we are relocated in case it fails throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStatesForPrimary + ", origin [" + origin + "]");
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) { }
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering, origin [" + origin + "]"); } else if (origin == Engine.Operation.Origin.RECOVERY) {
if (state != IndexShardState.RECOVERING) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when recovering, origin [" + origin + "]");
} }
} else { } else {
// for replicas, we allow to write also while recovering, since we index also during recovery to replicas assert origin == Engine.Operation.Origin.REPLICA;
// and rely on version checks to make sure its consistent if (writeAllowedStatesForReplica.contains(state) == false) {
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) { throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStatesForReplica + ", origin [" + origin + "]");
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering, origin [" + origin + "]");
} }
} }
} }
@ -995,7 +1016,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private void verifyNotClosed(Throwable suppressed) throws IllegalIndexShardStateException { private void verifyNotClosed(Throwable suppressed) throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read IndexShardState state = this.state; // one time volatile read
if (state == IndexShardState.CLOSED) { if (state == IndexShardState.CLOSED) {
final IllegalIndexShardStateException exc = new IllegalIndexShardStateException(shardId, state, "operation only allowed when not closed"); final IllegalIndexShardStateException exc = new IndexShardClosedException(shardId, "operation only allowed when not closed");
if (suppressed != null) { if (suppressed != null) {
exc.addSuppressed(suppressed); exc.addSuppressed(suppressed);
} }
@ -1390,37 +1411,21 @@ public class IndexShard extends AbstractIndexShardComponent {
idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME)); idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
} }
private static class IndexShardOperationCounter extends AbstractRefCounted { public Releasable acquirePrimaryOperationLock() {
final private ESLogger logger; verifyNotClosed();
private final ShardId shardId; if (shardRouting.primary() == false) {
throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
public IndexShardOperationCounter(ESLogger logger, ShardId shardId) {
super("index-shard-operations-counter");
this.logger = logger;
this.shardId = shardId;
}
@Override
protected void closeInternal() {
logger.debug("operations counter reached 0, will not accept any further writes");
}
@Override
protected void alreadyClosed() {
throw new IndexShardClosedException(shardId, "could not increment operation counter. shard is closed.");
} }
return suspendableRefContainer.acquireUninterruptibly();
} }
public void incrementOperationCounter() { public Releasable acquireReplicaOperationLock() {
indexShardOperationCounter.incRef(); verifyNotClosed();
return suspendableRefContainer.acquireUninterruptibly();
} }
public void decrementOperationCounter() { public int getActiveOperationsCount() {
indexShardOperationCounter.decRef(); return suspendableRefContainer.activeRefs(); // refCount is incremented on creation and decremented on close
}
public int getOperationsCount() {
return Math.max(0, indexShardOperationCounter.refCount() - 1); // refCount is incremented on creation and decremented on close
} }
/** /**

View File

@ -29,10 +29,14 @@ import java.io.IOException;
public class IndexShardRelocatedException extends IllegalIndexShardStateException { public class IndexShardRelocatedException extends IllegalIndexShardStateException {
public IndexShardRelocatedException(ShardId shardId) { public IndexShardRelocatedException(ShardId shardId) {
super(shardId, IndexShardState.RELOCATED, "Already relocated"); this(shardId, "Already relocated");
}
public IndexShardRelocatedException(ShardId shardId, String reason) {
super(shardId, IndexShardState.RELOCATED, reason);
} }
public IndexShardRelocatedException(StreamInput in) throws IOException{ public IndexShardRelocatedException(StreamInput in) throws IOException{
super(in); super(in);
} }
} }

View File

@ -492,7 +492,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// shadow replicas do not support primary promotion. The master would reinitialize the shard, giving it a new allocation, meaning we should be there. // shadow replicas do not support primary promotion. The master would reinitialize the shard, giving it a new allocation, meaning we should be there.
assert (shardRouting.primary() && currentRoutingEntry.primary() == false) == false || indexShard.allowsPrimaryPromotion() : assert (shardRouting.primary() && currentRoutingEntry.primary() == false) == false || indexShard.allowsPrimaryPromotion() :
"shard for doesn't support primary promotion but master promoted it with changing allocation. New routing " + shardRouting + ", current routing " + currentRoutingEntry; "shard for doesn't support primary promotion but master promoted it with changing allocation. New routing " + shardRouting + ", current routing " + currentRoutingEntry;
indexShard.updateRoutingEntry(shardRouting, event.state().blocks().disableStatePersistence() == false); try {
indexShard.updateRoutingEntry(shardRouting, event.state().blocks().disableStatePersistence() == false);
} catch (Throwable e) {
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed updating shard routing entry", e);
}
} }
} }
@ -626,7 +630,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// For primaries: requests in any case are routed to both when its relocating and that way we handle // For primaries: requests in any case are routed to both when its relocating and that way we handle
// the edge case where its mark as relocated, and we might need to roll it back... // the edge case where its mark as relocated, and we might need to roll it back...
// For replicas: we are recovering a backup from a primary // For replicas: we are recovering a backup from a primary
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA; RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.PRIMARY_RELOCATION : RecoveryState.Type.REPLICA;
RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(), type, sourceNode, nodes.localNode()); RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(), type, sourceNode, nodes.localNode());
indexShard.markAsRecovering("from " + sourceNode, recoveryState); indexShard.markAsRecovering("from " + sourceNode, recoveryState);
recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData)); recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));

View File

@ -435,7 +435,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
if (indexShard.routingEntry().primary() == false) { if (indexShard.routingEntry().primary() == false) {
throw new IllegalStateException("[" + request.shardId() +"] expected a primary shard"); throw new IllegalStateException("[" + request.shardId() +"] expected a primary shard");
} }
int opCount = indexShard.getOperationsCount(); int opCount = indexShard.getActiveOperationsCount();
logger.trace("{} in flight operations sampled at [{}]", request.shardId(), opCount); logger.trace("{} in flight operations sampled at [{}]", request.shardId(), opCount);
return new InFlightOpsResponse(opCount); return new InFlightOpsResponse(opCount);
} }

View File

@ -61,8 +61,7 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe
private final ClusterService clusterService; private final ClusterService clusterService;
private final OngoingRecoveres ongoingRecoveries = new OngoingRecoveres(); private final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries();
@Inject @Inject
public RecoverySource(Settings settings, TransportService transportService, IndicesService indicesService, public RecoverySource(Settings settings, TransportService transportService, IndicesService indicesService,
@ -107,11 +106,11 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe
} }
if (!targetShardRouting.initializing()) { if (!targetShardRouting.initializing()) {
logger.debug("delaying recovery of {} as it is not listed as initializing on the target node {}. known shards state is [{}]", logger.debug("delaying recovery of {} as it is not listed as initializing on the target node {}. known shards state is [{}]",
request.shardId(), request.targetNode(), targetShardRouting.state()); request.shardId(), request.targetNode(), targetShardRouting.state());
throw new DelayRecoveryException("source node has the state of the target shard to be [" + targetShardRouting.state() + "], expecting to be [initializing]"); throw new DelayRecoveryException("source node has the state of the target shard to be [" + targetShardRouting.state() + "], expecting to be [initializing]");
} }
logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode(), request.markAsRelocated()); logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode());
final RecoverySourceHandler handler; final RecoverySourceHandler handler;
if (shard.indexSettings().isOnSharedFilesystem()) { if (shard.indexSettings().isOnSharedFilesystem()) {
handler = new SharedFSRecoverySourceHandler(shard, request, recoverySettings, transportService, logger); handler = new SharedFSRecoverySourceHandler(shard, request, recoverySettings, transportService, logger);
@ -134,8 +133,7 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe
} }
} }
private static final class OngoingRecoveries {
private static final class OngoingRecoveres {
private final Map<IndexShard, Set<RecoverySourceHandler>> ongoingRecoveries = new HashMap<>(); private final Map<IndexShard, Set<RecoverySourceHandler>> ongoingRecoveries = new HashMap<>();
synchronized void add(IndexShard shard, RecoverySourceHandler handler) { synchronized void add(IndexShard shard, RecoverySourceHandler handler) {

View File

@ -393,9 +393,11 @@ public class RecoverySourceHandler {
} }
}); });
if (isPrimaryRelocation()) {
if (request.markAsRelocated()) { /**
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started * if the recovery process fails after setting the shard state to RELOCATED, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
try { try {
shard.relocated("to " + request.targetNode()); shard.relocated("to " + request.targetNode());
} catch (IllegalIndexShardStateException e) { } catch (IllegalIndexShardStateException e) {
@ -406,7 +408,11 @@ public class RecoverySourceHandler {
} }
stopWatch.stop(); stopWatch.stop();
logger.trace("[{}][{}] finalizing recovery to {}: took [{}]", logger.trace("[{}][{}] finalizing recovery to {}: took [{}]",
indexName, shardId, request.targetNode(), stopWatch.totalTime()); indexName, shardId, request.targetNode(), stopWatch.totalTime());
}
protected boolean isPrimaryRelocation() {
return request.recoveryType() == RecoveryState.Type.PRIMARY_RELOCATION;
} }
/** /**

View File

@ -101,7 +101,7 @@ public class RecoveryState implements ToXContent, Streamable {
STORE((byte) 0), STORE((byte) 0),
SNAPSHOT((byte) 1), SNAPSHOT((byte) 1),
REPLICA((byte) 2), REPLICA((byte) 2),
RELOCATION((byte) 3); PRIMARY_RELOCATION((byte) 3);
private static final Type[] TYPES = new Type[Type.values().length]; private static final Type[] TYPES = new Type[Type.values().length];

View File

@ -138,7 +138,6 @@ public class RecoveryTarget extends AbstractComponent implements IndexEventListe
// create a new recovery status, and process... // create a new recovery status, and process...
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout()); final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
threadPool.generic().execute(new RecoveryRunner(recoveryId)); threadPool.generic().execute(new RecoveryRunner(recoveryId));
} }
protected void retryRecovery(final RecoveryStatus recoveryStatus, final Throwable reason, TimeValue retryAfter, final StartRecoveryRequest currentRequest) { protected void retryRecovery(final RecoveryStatus recoveryStatus, final Throwable reason, TimeValue retryAfter, final StartRecoveryRequest currentRequest) {
@ -178,7 +177,7 @@ public class RecoveryTarget extends AbstractComponent implements IndexEventListe
return; return;
} }
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(), final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(),
false, metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId()); metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId());
final AtomicReference<RecoveryResponse> responseHolder = new AtomicReference<>(); final AtomicReference<RecoveryResponse> responseHolder = new AtomicReference<>();
try { try {
@ -267,7 +266,6 @@ public class RecoveryTarget extends AbstractComponent implements IndexEventListe
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, "source shard is closed", cause), false); onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, "source shard is closed", cause), false);
return; return;
} }
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, e), true); onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, e), true);
} }
} }

View File

@ -84,8 +84,4 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
return 0; return 0;
} }
private boolean isPrimaryRelocation() {
return request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary();
}
} }

View File

@ -41,8 +41,6 @@ public class StartRecoveryRequest extends TransportRequest {
private DiscoveryNode targetNode; private DiscoveryNode targetNode;
private boolean markAsRelocated;
private Store.MetadataSnapshot metadataSnapshot; private Store.MetadataSnapshot metadataSnapshot;
private RecoveryState.Type recoveryType; private RecoveryState.Type recoveryType;
@ -56,12 +54,11 @@ public class StartRecoveryRequest extends TransportRequest {
* @param sourceNode The node to recover from * @param sourceNode The node to recover from
* @param targetNode The node to recover to * @param targetNode The node to recover to
*/ */
public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Store.MetadataSnapshot metadataSnapshot, RecoveryState.Type recoveryType, long recoveryId) { public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, Store.MetadataSnapshot metadataSnapshot, RecoveryState.Type recoveryType, long recoveryId) {
this.recoveryId = recoveryId; this.recoveryId = recoveryId;
this.shardId = shardId; this.shardId = shardId;
this.sourceNode = sourceNode; this.sourceNode = sourceNode;
this.targetNode = targetNode; this.targetNode = targetNode;
this.markAsRelocated = markAsRelocated;
this.recoveryType = recoveryType; this.recoveryType = recoveryType;
this.metadataSnapshot = metadataSnapshot; this.metadataSnapshot = metadataSnapshot;
} }
@ -82,10 +79,6 @@ public class StartRecoveryRequest extends TransportRequest {
return targetNode; return targetNode;
} }
public boolean markAsRelocated() {
return markAsRelocated;
}
public RecoveryState.Type recoveryType() { public RecoveryState.Type recoveryType() {
return recoveryType; return recoveryType;
} }
@ -101,7 +94,6 @@ public class StartRecoveryRequest extends TransportRequest {
shardId = ShardId.readShardId(in); shardId = ShardId.readShardId(in);
sourceNode = DiscoveryNode.readNode(in); sourceNode = DiscoveryNode.readNode(in);
targetNode = DiscoveryNode.readNode(in); targetNode = DiscoveryNode.readNode(in);
markAsRelocated = in.readBoolean();
metadataSnapshot = new Store.MetadataSnapshot(in); metadataSnapshot = new Store.MetadataSnapshot(in);
recoveryType = RecoveryState.Type.fromId(in.readByte()); recoveryType = RecoveryState.Type.fromId(in.readByte());
@ -114,7 +106,6 @@ public class StartRecoveryRequest extends TransportRequest {
shardId.writeTo(out); shardId.writeTo(out);
sourceNode.writeTo(out); sourceNode.writeTo(out);
targetNode.writeTo(out); targetNode.writeTo(out);
out.writeBoolean(markAsRelocated);
metadataSnapshot.writeTo(out); metadataSnapshot.writeTo(out);
out.writeByte(recoveryType.id()); out.writeByte(recoveryType.id());
} }

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
import java.util.function.Supplier;
/** /**
* Base class for delegating transport response to a transport channel * Base class for delegating transport response to a transport channel
@ -30,7 +31,7 @@ import java.io.IOException;
public abstract class TransportChannelResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> { public abstract class TransportChannelResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
/** /**
* Convenience method for delegating an empty response to the provided changed * Convenience method for delegating an empty response to the provided transport channel
*/ */
public static TransportChannelResponseHandler<TransportResponse.Empty> emptyResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError) { public static TransportChannelResponseHandler<TransportResponse.Empty> emptyResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError) {
return new TransportChannelResponseHandler<TransportResponse.Empty>(logger, channel, extraInfoOnError) { return new TransportChannelResponseHandler<TransportResponse.Empty>(logger, channel, extraInfoOnError) {
@ -41,6 +42,19 @@ public abstract class TransportChannelResponseHandler<T extends TransportRespons
}; };
} }
/**
* Convenience method for delegating a response provided by supplier to the provided transport channel
*/
public static <T extends TransportResponse> TransportChannelResponseHandler responseHandler(ESLogger logger, Supplier<T> responseSupplier, TransportChannel channel, String extraInfoOnError) {
return new TransportChannelResponseHandler<T>(logger, channel, extraInfoOnError) {
@Override
public T newInstance() {
return responseSupplier.get();
}
};
}
private final ESLogger logger; private final ESLogger logger;
private final TransportChannel channel; private final TransportChannel channel;
private final String extraInfoOnError; private final String extraInfoOnError;

View File

@ -56,12 +56,12 @@ public class ClusterStateCreationUtils {
/** /**
* Creates cluster state with and index that has one shard and #(replicaStates) replicas * Creates cluster state with and index that has one shard and #(replicaStates) replicas
* *
* @param index name of the index * @param index name of the index
* @param primaryLocal if primary should coincide with the local node in the cluster state * @param activePrimaryLocal if active primary should coincide with the local node in the cluster state
* @param primaryState state of primary * @param primaryState state of primary
* @param replicaStates states of the replicas. length of this array determines also the number of replicas * @param replicaStates states of the replicas. length of this array determines also the number of replicas
*/ */
public static ClusterState state(String index, boolean primaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) { public static ClusterState state(String index, boolean activePrimaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) {
final int numberOfReplicas = replicaStates.length; final int numberOfReplicas = replicaStates.length;
int numberOfNodes = numberOfReplicas + 1; int numberOfNodes = numberOfReplicas + 1;
@ -97,7 +97,7 @@ public class ClusterStateCreationUtils {
String relocatingNode = null; String relocatingNode = null;
UnassignedInfo unassignedInfo = null; UnassignedInfo unassignedInfo = null;
if (primaryState != ShardRoutingState.UNASSIGNED) { if (primaryState != ShardRoutingState.UNASSIGNED) {
if (primaryLocal) { if (activePrimaryLocal) {
primaryNode = newNode(0).id(); primaryNode = newNode(0).id();
unassignedNodes.remove(primaryNode); unassignedNodes.remove(primaryNode);
} else { } else {
@ -173,13 +173,13 @@ public class ClusterStateCreationUtils {
* Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas. * Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas.
* Primary will be STARTED in cluster state but replicas will be one of UNASSIGNED, INITIALIZING, STARTED or RELOCATING. * Primary will be STARTED in cluster state but replicas will be one of UNASSIGNED, INITIALIZING, STARTED or RELOCATING.
* *
* @param index name of the index * @param index name of the index
* @param primaryLocal if primary should coincide with the local node in the cluster state * @param activePrimaryLocal if active primary should coincide with the local node in the cluster state
* @param numberOfReplicas number of replicas * @param numberOfReplicas number of replicas
*/ */
public static ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int numberOfReplicas) { public static ClusterState stateWithActivePrimary(String index, boolean activePrimaryLocal, int numberOfReplicas) {
int assignedReplicas = randomIntBetween(0, numberOfReplicas); int assignedReplicas = randomIntBetween(0, numberOfReplicas);
return stateWithStartedPrimary(index, primaryLocal, assignedReplicas, numberOfReplicas - assignedReplicas); return stateWithActivePrimary(index, activePrimaryLocal, assignedReplicas, numberOfReplicas - assignedReplicas);
} }
/** /**
@ -188,11 +188,11 @@ public class ClusterStateCreationUtils {
* some (assignedReplicas) will be one of INITIALIZING, STARTED or RELOCATING. * some (assignedReplicas) will be one of INITIALIZING, STARTED or RELOCATING.
* *
* @param index name of the index * @param index name of the index
* @param primaryLocal if primary should coincide with the local node in the cluster state * @param activePrimaryLocal if active primary should coincide with the local node in the cluster state
* @param assignedReplicas number of replicas that should have INITIALIZING, STARTED or RELOCATING state * @param assignedReplicas number of replicas that should have INITIALIZING, STARTED or RELOCATING state
* @param unassignedReplicas number of replicas that should be unassigned * @param unassignedReplicas number of replicas that should be unassigned
*/ */
public static ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int assignedReplicas, int unassignedReplicas) { public static ClusterState stateWithActivePrimary(String index, boolean activePrimaryLocal, int assignedReplicas, int unassignedReplicas) {
ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas]; ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas];
// no point in randomizing - node assignment later on does it too. // no point in randomizing - node assignment later on does it too.
for (int i = 0; i < assignedReplicas; i++) { for (int i = 0; i < assignedReplicas; i++) {
@ -201,7 +201,7 @@ public class ClusterStateCreationUtils {
for (int i = assignedReplicas; i < replicaStates.length; i++) { for (int i = assignedReplicas; i < replicaStates.length; i++) {
replicaStates[i] = ShardRoutingState.UNASSIGNED; replicaStates[i] = ShardRoutingState.UNASSIGNED;
} }
return state(index, primaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates); return state(index, activePrimaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates);
} }
/** /**

View File

@ -37,6 +37,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
@ -75,9 +76,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
@ -225,7 +227,7 @@ public class TransportReplicationActionTests extends ESTestCase {
final String index = "test"; final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0); final ShardId shardId = new ShardId(index, "_na_", 0);
clusterService.setState(stateWithStartedPrimary(index, randomBoolean(), 3)); clusterService.setState(stateWithActivePrimary(index, randomBoolean(), 3));
logger.debug("using state: \n{}", clusterService.state().prettyPrint()); logger.debug("using state: \n{}", clusterService.state().prettyPrint());
@ -249,33 +251,73 @@ public class TransportReplicationActionTests extends ESTestCase {
assertIndexShardUninitialized(); assertIndexShardUninitialized();
} }
public void testPrimaryPhaseExecutesRequest() throws InterruptedException, ExecutionException { public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throws InterruptedException, ExecutionException {
final String index = "test"; final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0); final ShardId shardId = new ShardId(index, "_na_", 0);
clusterService.setState(state(index, true, ShardRoutingState.STARTED, ShardRoutingState.STARTED)); ClusterState state = stateWithActivePrimary(index, true, randomInt(5));
clusterService.setState(state);
Request request = new Request(shardId).timeout("1ms"); Request request = new Request(shardId).timeout("1ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>(); PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener)); AtomicBoolean movedToReplication = new AtomicBoolean();
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener)) {
@Override
void finishAndMoveToReplication(TransportReplicationAction.ReplicationPhase replicationPhase) {
super.finishAndMoveToReplication(replicationPhase);
movedToReplication.set(true);
}
};
ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
boolean executeOnPrimary = true;
if (primaryShard.relocating() && randomBoolean()) { // whether shard has been marked as relocated already (i.e. relocation completed)
isRelocated.set(true);
indexShardRouting.set(primaryShard);
executeOnPrimary = false;
}
primaryPhase.run(); primaryPhase.run();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); assertThat(request.processedOnPrimary.get(), equalTo(executeOnPrimary));
final String replicaNodeId = clusterService.state().getRoutingTable().shardRoutingTable(index, shardId.id()).replicaShards().get(0).currentNodeId(); assertThat(movedToReplication.get(), equalTo(executeOnPrimary));
final List<CapturingTransport.CapturedRequest> requests = transport.getCapturedRequestsByTargetNodeAndClear().get(replicaNodeId); if (executeOnPrimary == false) {
assertThat(requests, notNullValue()); final List<CapturingTransport.CapturedRequest> requests = transport.capturedRequestsByTargetNode().get(primaryShard.relocatingNodeId());
assertThat(requests.size(), equalTo(1)); assertThat(requests, notNullValue());
assertThat("replica request was not sent", requests.get(0).action, equalTo("testAction[r]")); assertThat(requests.size(), equalTo(1));
assertThat("primary request was not delegated to relocation target", requests.get(0).action, equalTo("testAction[p]"));
}
}
public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws InterruptedException, ExecutionException {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
ClusterState state = state(index, true, ShardRoutingState.RELOCATING);
String primaryTargetNodeId = state.getRoutingTable().shardRoutingTable(shardId).primaryShard().relocatingNodeId();
// simulate execution of the primary phase on the relocation target node
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryTargetNodeId)).build();
clusterService.setState(state);
Request request = new Request(shardId).timeout("1ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
AtomicBoolean movedToReplication = new AtomicBoolean();
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener)) {
@Override
void finishAndMoveToReplication(TransportReplicationAction.ReplicationPhase replicationPhase) {
super.finishAndMoveToReplication(replicationPhase);
movedToReplication.set(true);
}
};
primaryPhase.run();
assertThat("request was not processed on primary relocation target", request.processedOnPrimary.get(), equalTo(true));
assertThat(movedToReplication.get(), equalTo(true));
} }
public void testAddedReplicaAfterPrimaryOperation() { public void testAddedReplicaAfterPrimaryOperation() {
final String index = "test"; final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0); final ShardId shardId = new ShardId(index, "_na_", 0);
// start with no replicas // start with no replicas
clusterService.setState(stateWithStartedPrimary(index, true, 0)); clusterService.setState(stateWithActivePrimary(index, true, 0));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
final ClusterState stateWithAddedReplicas = state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED); final ClusterState stateWithAddedReplicas = state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED);
final Action actionWithAddedReplicaAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { final Action actionWithAddedReplicaAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override @Override
protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception {
final Tuple<Response, Request> operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest); final Tuple<Response, Request> operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest);
// add replicas after primary operation // add replicas after primary operation
((TestClusterService) clusterService).setState(stateWithAddedReplicas); ((TestClusterService) clusterService).setState(stateWithAddedReplicas);
@ -302,13 +344,13 @@ public class TransportReplicationActionTests extends ESTestCase {
final String index = "test"; final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0); final ShardId shardId = new ShardId(index, "_na_", 0);
// start with a replica // start with a replica
clusterService.setState(state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED)); clusterService.setState(state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
final ClusterState stateWithRelocatingReplica = state(index, true, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); final ClusterState stateWithRelocatingReplica = state(index, true, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING);
final Action actionWithRelocatingReplicasAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { final Action actionWithRelocatingReplicasAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override @Override
protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception {
final Tuple<Response, Request> operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest); final Tuple<Response, Request> operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest);
// set replica to relocating // set replica to relocating
((TestClusterService) clusterService).setState(stateWithRelocatingReplica); ((TestClusterService) clusterService).setState(stateWithRelocatingReplica);
@ -341,7 +383,7 @@ public class TransportReplicationActionTests extends ESTestCase {
final Action actionWithDeletedIndexAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { final Action actionWithDeletedIndexAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override @Override
protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception {
final Tuple<Response, Request> operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest); final Tuple<Response, Request> operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest);
// delete index after primary op // delete index after primary op
((TestClusterService) clusterService).setState(stateWithDeletedIndex); ((TestClusterService) clusterService).setState(stateWithDeletedIndex);
@ -432,7 +474,13 @@ public class TransportReplicationActionTests extends ESTestCase {
final String index = "test"; final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0); final ShardId shardId = new ShardId(index, "_na_", 0);
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); ClusterState state = stateWithActivePrimary(index, true, randomInt(5));
ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
if (primaryShard.relocating() && randomBoolean()) {
// simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
}
clusterService.setState(state);
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
int assignedReplicas = 0; int assignedReplicas = 0;
@ -455,12 +503,19 @@ public class TransportReplicationActionTests extends ESTestCase {
final String index = "test"; final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0); final ShardId shardId = new ShardId(index, "_na_", 0);
ClusterState state = stateWithStartedPrimary(index, true, randomInt(5)); ClusterState state = stateWithActivePrimary(index, true, randomInt(5));
MetaData.Builder metaData = MetaData.builder(state.metaData()); MetaData.Builder metaData = MetaData.builder(state.metaData());
Settings.Builder settings = Settings.builder().put(metaData.get(index).getSettings()); Settings.Builder settings = Settings.builder().put(metaData.get(index).getSettings());
settings.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true); settings.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true);
metaData.put(IndexMetaData.builder(metaData.get(index)).settings(settings)); metaData.put(IndexMetaData.builder(metaData.get(index)).settings(settings));
clusterService.setState(ClusterState.builder(state).metaData(metaData)); state = ClusterState.builder(state).metaData(metaData).build();
ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
if (primaryShard.relocating() && randomBoolean()) {
// simulate execution of the primary phase on the relocation target node
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
}
clusterService.setState(state);
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
int assignedReplicas = 0; int assignedReplicas = 0;
@ -507,8 +562,9 @@ public class TransportReplicationActionTests extends ESTestCase {
assertEquals(request.shardId, replicationRequest.shardId); assertEquals(request.shardId, replicationRequest.shardId);
} }
String localNodeId = clusterService.state().getNodes().localNodeId();
// no request was sent to the local node // no request was sent to the local node
assertThat(nodesSentTo.keySet(), not(hasItem(clusterService.state().getNodes().localNodeId()))); assertThat(nodesSentTo.keySet(), not(hasItem(localNodeId)));
// requests were sent to the correct shard copies // requests were sent to the correct shard copies
for (ShardRouting shard : clusterService.state().getRoutingTable().shardRoutingTable(shardId)) { for (ShardRouting shard : clusterService.state().getRoutingTable().shardRoutingTable(shardId)) {
@ -518,11 +574,11 @@ public class TransportReplicationActionTests extends ESTestCase {
if (shard.unassigned()) { if (shard.unassigned()) {
continue; continue;
} }
if (shard.primary() == false) { if (localNodeId.equals(shard.currentNodeId()) == false) {
nodesSentTo.remove(shard.currentNodeId()); assertThat(nodesSentTo.remove(shard.currentNodeId()), notNullValue());
} }
if (shard.relocating()) { if (shard.relocating() && localNodeId.equals(shard.relocatingNodeId()) == false) { // for relocating primaries, we replicate from target to source if source is marked as relocated
nodesSentTo.remove(shard.relocatingNodeId()); assertThat(nodesSentTo.remove(shard.relocatingNodeId()), notNullValue());
} }
} }
@ -629,6 +685,7 @@ public class TransportReplicationActionTests extends ESTestCase {
// shard operation should be ongoing, so the counter is at 2 // shard operation should be ongoing, so the counter is at 2
// we have to wait here because increment happens in thread // we have to wait here because increment happens in thread
assertBusy(() -> assertIndexShardCounter(2)); assertBusy(() -> assertIndexShardCounter(2));
assertThat(transport.capturedRequests().length, equalTo(0)); assertThat(transport.capturedRequests().length, equalTo(0));
((ActionWithDelay) action).countDownLatch.countDown(); ((ActionWithDelay) action).countDownLatch.countDown();
t.join(); t.join();
@ -726,12 +783,28 @@ public class TransportReplicationActionTests extends ESTestCase {
private final AtomicInteger count = new AtomicInteger(0); private final AtomicInteger count = new AtomicInteger(0);
private final AtomicBoolean isRelocated = new AtomicBoolean(false);
private final AtomicReference<ShardRouting> indexShardRouting = new AtomicReference<>();
/* /*
* Returns testIndexShardOperationsCounter or initializes it if it was already created in this test run. * Returns testIndexShardOperationsCounter or initializes it if it was already created in this test run.
* */ * */
private synchronized Releasable getOrCreateIndexShardOperationsCounter() { private synchronized TransportReplicationAction.IndexShardReference getOrCreateIndexShardOperationsCounter() {
count.incrementAndGet(); count.incrementAndGet();
return new Releasable() { return new TransportReplicationAction.IndexShardReference() {
@Override
public boolean isRelocated() {
return isRelocated.get();
}
@Override
public ShardRouting routingEntry() {
ShardRouting shardRouting = indexShardRouting.get();
assert shardRouting != null;
return shardRouting;
}
@Override @Override
public void close() { public void close() {
count.decrementAndGet(); count.decrementAndGet();
@ -783,7 +856,7 @@ public class TransportReplicationActionTests extends ESTestCase {
} }
@Override @Override
protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception {
boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true);
assert executedBefore == false : "request has already been executed on the primary"; assert executedBefore == false : "request has already been executed on the primary";
return new Tuple<>(new Response(), shardRequest); return new Tuple<>(new Response(), shardRequest);
@ -805,7 +878,11 @@ public class TransportReplicationActionTests extends ESTestCase {
} }
@Override @Override
protected Releasable getIndexShardOperationsCounter(ShardId shardId) { protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
return getOrCreateIndexShardOperationsCounter();
}
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
return getOrCreateIndexShardOperationsCounter(); return getOrCreateIndexShardOperationsCounter();
} }
} }
@ -832,7 +909,7 @@ public class TransportReplicationActionTests extends ESTestCase {
} }
@Override @Override
protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) {
return throwException(shardRequest.shardId()); return throwException(shardRequest.shardId());
} }
@ -870,7 +947,7 @@ public class TransportReplicationActionTests extends ESTestCase {
} }
@Override @Override
protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { protected Tuple<Response, Request> shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception {
awaitLatch(); awaitLatch();
return new Tuple<>(new Response(), shardRequest); return new Tuple<>(new Response(), shardRequest);
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.action.shard; package org.elasticsearch.cluster.action.shard;
import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateObserver;
@ -33,7 +34,6 @@ import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport;
@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer; import java.util.function.LongConsumer;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ -127,7 +126,7 @@ public class ShardStateActionTests extends ESTestCase {
public void testSuccess() throws InterruptedException { public void testSuccess() throws InterruptedException {
final String index = "test"; final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
@ -169,7 +168,7 @@ public class ShardStateActionTests extends ESTestCase {
public void testNoMaster() throws InterruptedException { public void testNoMaster() throws InterruptedException {
final String index = "test"; final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
DiscoveryNodes.Builder noMasterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); DiscoveryNodes.Builder noMasterBuilder = DiscoveryNodes.builder(clusterService.state().nodes());
noMasterBuilder.masterNodeId(null); noMasterBuilder.masterNodeId(null);
@ -207,7 +206,7 @@ public class ShardStateActionTests extends ESTestCase {
public void testMasterChannelException() throws InterruptedException { public void testMasterChannelException() throws InterruptedException {
final String index = "test"; final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
@ -264,7 +263,7 @@ public class ShardStateActionTests extends ESTestCase {
public void testUnhandledFailure() { public void testUnhandledFailure() {
final String index = "test"; final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
@ -294,7 +293,7 @@ public class ShardStateActionTests extends ESTestCase {
public void testShardNotFound() throws InterruptedException { public void testShardNotFound() throws InterruptedException {
final String index = "test"; final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5)));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();

View File

@ -57,6 +57,8 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -108,6 +110,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -125,6 +128,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitC
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
/** /**
* Simple unit-test IndexShard related operations. * Simple unit-test IndexShard related operations.
@ -316,36 +320,41 @@ public class IndexShardTests extends ESSingleNodeTestCase {
} }
public void testDeleteIndexDecreasesCounter() throws InterruptedException, ExecutionException, IOException { public void testDeleteIndexPreventsNewOperations() throws InterruptedException, ExecutionException, IOException {
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get());
ensureGreen("test"); ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe("test"); IndexService indexService = indicesService.indexServiceSafe("test");
IndexShard indexShard = indexService.getShardOrNull(0); IndexShard indexShard = indexService.getShardOrNull(0);
client().admin().indices().prepareDelete("test").get(); client().admin().indices().prepareDelete("test").get();
assertThat(indexShard.getOperationsCount(), equalTo(0)); assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
try { try {
indexShard.incrementOperationCounter(); indexShard.acquirePrimaryOperationLock();
fail("we should not be able to increment anymore");
} catch (IndexShardClosedException e) {
// expected
}
try {
indexShard.acquireReplicaOperationLock();
fail("we should not be able to increment anymore"); fail("we should not be able to increment anymore");
} catch (IndexShardClosedException e) { } catch (IndexShardClosedException e) {
// expected // expected
} }
} }
public void testIndexShardCounter() throws InterruptedException, ExecutionException, IOException { public void testIndexOperationsCounter() throws InterruptedException, ExecutionException, IOException {
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get());
ensureGreen("test"); ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe("test"); IndexService indexService = indicesService.indexServiceSafe("test");
IndexShard indexShard = indexService.getShardOrNull(0); IndexShard indexShard = indexService.getShardOrNull(0);
assertEquals(0, indexShard.getOperationsCount()); assertEquals(0, indexShard.getActiveOperationsCount());
indexShard.incrementOperationCounter(); Releasable operation1 = indexShard.acquirePrimaryOperationLock();
assertEquals(1, indexShard.getOperationsCount()); assertEquals(1, indexShard.getActiveOperationsCount());
indexShard.incrementOperationCounter(); Releasable operation2 = indexShard.acquirePrimaryOperationLock();
assertEquals(2, indexShard.getOperationsCount()); assertEquals(2, indexShard.getActiveOperationsCount());
indexShard.decrementOperationCounter(); Releasables.close(operation1, operation2);
indexShard.decrementOperationCounter(); assertEquals(0, indexShard.getActiveOperationsCount());
assertEquals(0, indexShard.getOperationsCount());
} }
public void testMarkAsInactiveTriggersSyncedFlush() throws Exception { public void testMarkAsInactiveTriggersSyncedFlush() throws Exception {
@ -777,6 +786,89 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertEquals(total + 1, shard.flushStats().getTotal()); assertEquals(total + 1, shard.flushStats().getTotal());
} }
public void testLockingBeforeAndAfterRelocated() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test").setSettings(
Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
).get());
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.getShardOrNull(0);
CountDownLatch latch = new CountDownLatch(1);
Thread recoveryThread = new Thread(() -> {
latch.countDown();
shard.relocated("simulated recovery");
});
try (Releasable ignored = shard.acquirePrimaryOperationLock()) {
// start finalization of recovery
recoveryThread.start();
latch.await();
// recovery can only be finalized after we release the current primaryOperationLock
assertThat(shard.state(), equalTo(IndexShardState.STARTED));
}
// recovery can be now finalized
recoveryThread.join();
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
try (Releasable ignored = shard.acquirePrimaryOperationLock()) {
// lock can again be acquired
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
}
}
public void testStressRelocated() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test").setSettings(
Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
).get());
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.getShardOrNull(0);
final int numThreads = randomIntBetween(2, 4);
Thread[] indexThreads = new Thread[numThreads];
CountDownLatch somePrimaryOperationLockAcquired = new CountDownLatch(1);
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
for (int i = 0; i < indexThreads.length; i++) {
indexThreads[i] = new Thread() {
@Override
public void run() {
try (Releasable operationLock = shard.acquirePrimaryOperationLock()) {
somePrimaryOperationLockAcquired.countDown();
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
};
indexThreads[i].start();
}
AtomicBoolean relocated = new AtomicBoolean();
final Thread recoveryThread = new Thread(() -> {
shard.relocated("simulated recovery");
relocated.set(true);
});
// ensure we wait for at least one primary operation lock to be acquired
somePrimaryOperationLockAcquired.await();
// start recovery thread
recoveryThread.start();
assertThat(relocated.get(), equalTo(false));
assertThat(shard.getActiveOperationsCount(), greaterThan(0));
// ensure we only transition to RELOCATED state after pending operations completed
assertThat(shard.state(), equalTo(IndexShardState.STARTED));
// complete pending operations
barrier.await();
// complete recovery/relocation
recoveryThread.join();
// ensure relocated successfully once pending operations are done
assertThat(relocated.get(), equalTo(true));
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
assertThat(shard.getActiveOperationsCount(), equalTo(0));
for (Thread indexThread : indexThreads) {
indexThread.join();
}
}
public void testRecoverFromStore() throws IOException { public void testRecoverFromStore() throws IOException {
createIndex("test"); createIndex("test");
ensureGreen(); ensureGreen();
@ -857,6 +949,27 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertHitCount(client().prepareSearch().get(), 1); assertHitCount(client().prepareSearch().get(), 1);
} }
public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.getShardOrNull(0);
ShardRouting origRouting = shard.routingEntry();
assertThat(shard.state(), equalTo(IndexShardState.STARTED));
ShardRouting inRecoveryRouting = new ShardRouting(origRouting);
ShardRoutingHelper.relocate(inRecoveryRouting, "some_node");
shard.updateRoutingEntry(inRecoveryRouting, true);
shard.relocated("simulate mark as relocated");
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
ShardRouting failedRecoveryRouting = new ShardRouting(origRouting);
try {
shard.updateRoutingEntry(failedRecoveryRouting, true);
fail("Expected IndexShardRelocatedException");
} catch (IndexShardRelocatedException expected) {
}
}
public void testRestoreShard() throws IOException { public void testRestoreShard() throws IOException {
createIndex("test"); createIndex("test");
createIndex("test_target"); createIndex("test_target");

View File

@ -58,6 +58,7 @@ import static org.elasticsearch.index.shard.IndexShardState.CLOSED;
import static org.elasticsearch.index.shard.IndexShardState.CREATED; import static org.elasticsearch.index.shard.IndexShardState.CREATED;
import static org.elasticsearch.index.shard.IndexShardState.POST_RECOVERY; import static org.elasticsearch.index.shard.IndexShardState.POST_RECOVERY;
import static org.elasticsearch.index.shard.IndexShardState.RECOVERING; import static org.elasticsearch.index.shard.IndexShardState.RECOVERING;
import static org.elasticsearch.index.shard.IndexShardState.RELOCATED;
import static org.elasticsearch.index.shard.IndexShardState.STARTED; import static org.elasticsearch.index.shard.IndexShardState.STARTED;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
@ -181,7 +182,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase {
ensureGreen(); ensureGreen();
//the 3 relocated shards get closed on the first node //the 3 relocated shards get closed on the first node
assertShardStatesMatch(stateChangeListenerNode1, 3, CLOSED); assertShardStatesMatch(stateChangeListenerNode1, 3, RELOCATED, CLOSED);
//the 3 relocated shards get created on the second node //the 3 relocated shards get created on the second node
assertShardStatesMatch(stateChangeListenerNode2, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED); assertShardStatesMatch(stateChangeListenerNode2, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED);

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
@ -110,8 +111,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
final ShardId shardId = shard.shardId(); final ShardId shardId = shard.shardId();
shard.incrementOperationCounter(); try (Releasable operationLock = shard.acquirePrimaryOperationLock()) {
try {
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>(); SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>();
flushService.attemptSyncedFlush(shardId, listener); flushService.attemptSyncedFlush(shardId, listener);
listener.latch.await(); listener.latch.await();
@ -121,8 +121,6 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
assertEquals(0, syncedFlushResult.successfulShards()); assertEquals(0, syncedFlushResult.successfulShards());
assertNotEquals(0, syncedFlushResult.totalShards()); assertNotEquals(0, syncedFlushResult.totalShards());
assertEquals("[1] ongoing operations on primary", syncedFlushResult.failureReason()); assertEquals("[1] ongoing operations on primary", syncedFlushResult.failureReason());
} finally {
shard.decrementOperationCounter();
} }
} }

View File

@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.equalTo;
@TestLogging("_root:DEBUG")
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class IndexPrimaryRelocationIT extends ESIntegTestCase {
private static final int RELOCATION_COUNT = 25;
public void testPrimaryRelocationWhileIndexing() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(2, 3));
client().admin().indices().prepareCreate("test")
.setSettings(Settings.settingsBuilder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.addMapping("type", "field", "type=string")
.get();
ensureGreen("test");
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = new Thread() {
@Override
public void run() {
while (finished.get() == false) {
IndexResponse indexResponse = client().prepareIndex("test", "type", "id").setSource("field", "value").get();
assertThat("deleted document was found", indexResponse.isCreated(), equalTo(true));
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "id").get();
assertThat("indexed document was not found", deleteResponse.isFound(), equalTo(true));
}
}
};
indexingThread.start();
ClusterState initialState = client().admin().cluster().prepareState().get().getState();
DiscoveryNode[] dataNodes = initialState.getNodes().dataNodes().values().toArray(DiscoveryNode.class);
DiscoveryNode relocationSource = initialState.getNodes().dataNodes().get(initialState.getRoutingTable().shardRoutingTable("test", 0).primaryShard().currentNodeId());
for (int i = 0; i < RELOCATION_COUNT; i++) {
DiscoveryNode relocationTarget = randomFrom(dataNodes);
while (relocationTarget.equals(relocationSource)) {
relocationTarget = randomFrom(dataNodes);
}
logger.info("--> [iteration {}] relocating from {} to {} ", i, relocationSource.getName(), relocationTarget.getName());
client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand("test", 0, relocationSource.getId(), relocationTarget.getId()))
.execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
logger.info("--> [iteration {}] relocation complete", i);
relocationSource = relocationTarget;
if (indexingThread.isAlive() == false) { // indexing process aborted early, no need for more relocations as test has already failed
break;
}
}
finished.set(true);
indexingThread.join();
}
}

View File

@ -286,7 +286,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
assertRecoveryState(nodeARecoveryStates.get(0), 0, Type.STORE, Stage.DONE, nodeA, nodeA, false); assertRecoveryState(nodeARecoveryStates.get(0), 0, Type.STORE, Stage.DONE, nodeA, nodeA, false);
validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex());
assertOnGoingRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, nodeA, nodeB, false); assertOnGoingRecoveryState(nodeBRecoveryStates.get(0), 0, Type.PRIMARY_RELOCATION, nodeA, nodeB, false);
validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex());
logger.info("--> request node recovery stats"); logger.info("--> request node recovery stats");
@ -339,7 +339,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); recoveryStates = response.shardRecoveryStates().get(INDEX_NAME);
assertThat(recoveryStates.size(), equalTo(1)); assertThat(recoveryStates.size(), equalTo(1));
assertRecoveryState(recoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); assertRecoveryState(recoveryStates.get(0), 0, Type.PRIMARY_RELOCATION, Stage.DONE, nodeA, nodeB, false);
validateIndexRecoveryState(recoveryStates.get(0).getIndex()); validateIndexRecoveryState(recoveryStates.get(0).getIndex());
statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get();
@ -400,7 +400,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
assertRecoveryState(nodeARecoveryStates.get(0), 0, Type.REPLICA, Stage.DONE, nodeB, nodeA, false); assertRecoveryState(nodeARecoveryStates.get(0), 0, Type.REPLICA, Stage.DONE, nodeB, nodeA, false);
validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex());
assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.PRIMARY_RELOCATION, Stage.DONE, nodeA, nodeB, false);
validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex());
// relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B) // relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B)
@ -421,7 +421,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates);
assertThat(nodeCRecoveryStates.size(), equalTo(1)); assertThat(nodeCRecoveryStates.size(), equalTo(1));
assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.PRIMARY_RELOCATION, Stage.DONE, nodeA, nodeB, false);
validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex());
// relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B) // relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B)
@ -503,7 +503,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) { for (int i = 0; i < numDocs; i++) {
docs[i] = client().prepareIndex(INDEX_NAME, INDEX_TYPE). docs[i] = client().prepareIndex(name, INDEX_TYPE).
setSource("foo-int", randomInt(), setSource("foo-int", randomInt(),
"foo-string", randomAsciiOfLength(32), "foo-string", randomAsciiOfLength(32),
"foo-float", randomFloat()); "foo-float", randomFloat());
@ -511,8 +511,8 @@ public class IndexRecoveryIT extends ESIntegTestCase {
indexRandom(true, docs); indexRandom(true, docs);
flush(); flush();
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().totalHits(), equalTo((long) numDocs)); assertThat(client().prepareSearch(name).setSize(0).get().getHits().totalHits(), equalTo((long) numDocs));
return client().admin().indices().prepareStats(INDEX_NAME).execute().actionGet(); return client().admin().indices().prepareStats(name).execute().actionGet();
} }
private void validateIndexRecoveryState(RecoveryState.Index indexState) { private void validateIndexRecoveryState(RecoveryState.Index indexState) {

View File

@ -69,7 +69,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
StartRecoveryRequest request = new StartRecoveryRequest(shardId, StartRecoveryRequest request = new StartRecoveryRequest(shardId,
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
randomBoolean(), null, RecoveryState.Type.STORE, randomLong()); null, RecoveryState.Type.STORE, randomLong());
Store store = newStore(createTempDir()); Store store = newStore(createTempDir());
RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger); RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger);
Directory dir = store.directory(); Directory dir = store.directory();
@ -118,7 +118,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
StartRecoveryRequest request = new StartRecoveryRequest(shardId, StartRecoveryRequest request = new StartRecoveryRequest(shardId,
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
randomBoolean(), null, RecoveryState.Type.STORE, randomLong()); null, RecoveryState.Type.STORE, randomLong());
Path tempDir = createTempDir(); Path tempDir = createTempDir();
Store store = newStore(tempDir, false); Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false); AtomicBoolean failedEngine = new AtomicBoolean(false);
@ -181,7 +181,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
StartRecoveryRequest request = new StartRecoveryRequest(shardId, StartRecoveryRequest request = new StartRecoveryRequest(shardId,
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
randomBoolean(), null, RecoveryState.Type.STORE, randomLong()); null, RecoveryState.Type.STORE, randomLong());
Path tempDir = createTempDir(); Path tempDir = createTempDir();
Store store = newStore(tempDir, false); Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false); AtomicBoolean failedEngine = new AtomicBoolean(false);

View File

@ -43,11 +43,9 @@ public class StartRecoveryRequestTests extends ESTestCase {
new ShardId("test", "_na_", 0), new ShardId("test", "_na_", 0),
new DiscoveryNode("a", new LocalTransportAddress("1"), targetNodeVersion), new DiscoveryNode("a", new LocalTransportAddress("1"), targetNodeVersion),
new DiscoveryNode("b", new LocalTransportAddress("1"), targetNodeVersion), new DiscoveryNode("b", new LocalTransportAddress("1"), targetNodeVersion),
true,
Store.MetadataSnapshot.EMPTY, Store.MetadataSnapshot.EMPTY,
RecoveryState.Type.RELOCATION, RecoveryState.Type.PRIMARY_RELOCATION,
1L 1L
); );
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
@ -63,7 +61,6 @@ public class StartRecoveryRequestTests extends ESTestCase {
assertThat(outRequest.shardId(), equalTo(inRequest.shardId())); assertThat(outRequest.shardId(), equalTo(inRequest.shardId()));
assertThat(outRequest.sourceNode(), equalTo(inRequest.sourceNode())); assertThat(outRequest.sourceNode(), equalTo(inRequest.sourceNode()));
assertThat(outRequest.targetNode(), equalTo(inRequest.targetNode())); assertThat(outRequest.targetNode(), equalTo(inRequest.targetNode()));
assertThat(outRequest.markAsRelocated(), equalTo(inRequest.markAsRelocated()));
assertThat(outRequest.metadataSnapshot().asMap(), equalTo(inRequest.metadataSnapshot().asMap())); assertThat(outRequest.metadataSnapshot().asMap(), equalTo(inRequest.metadataSnapshot().asMap()));
assertThat(outRequest.recoveryId(), equalTo(inRequest.recoveryId())); assertThat(outRequest.recoveryId(), equalTo(inRequest.recoveryId()));
assertThat(outRequest.recoveryType(), equalTo(inRequest.recoveryType())); assertThat(outRequest.recoveryType(), equalTo(inRequest.recoveryType()));

View File

@ -151,7 +151,7 @@ public class FullRollingRestartIT extends ESIntegTestCase {
ClusterState state = client().admin().cluster().prepareState().get().getState(); ClusterState state = client().admin().cluster().prepareState().get().getState();
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get(); RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) { for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) {
assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode() + "\n" + state.prettyPrint(), recoveryState.getType() != RecoveryState.Type.RELOCATION); assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode() + "\n" + state.prettyPrint(), recoveryState.getType() != RecoveryState.Type.PRIMARY_RELOCATION);
} }
internalCluster().restartRandomDataNode(); internalCluster().restartRandomDataNode();
ensureGreen(); ensureGreen();
@ -159,7 +159,7 @@ public class FullRollingRestartIT extends ESIntegTestCase {
recoveryResponse = client().admin().indices().prepareRecoveries("test").get(); recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) { for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) {
assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode()+ "-- \nbefore: \n" + state.prettyPrint() + "\nafter: \n" + afterState.prettyPrint(), recoveryState.getType() != RecoveryState.Type.RELOCATION); assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode()+ "-- \nbefore: \n" + state.prettyPrint() + "\nafter: \n" + afterState.prettyPrint(), recoveryState.getType() != RecoveryState.Type.PRIMARY_RELOCATION);
} }
} }
} }

View File

@ -1036,7 +1036,7 @@ public final class InternalTestCluster extends TestCluster {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) { for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) { for (IndexShard indexShard : indexService) {
assertThat("index shard counter on shard " + indexShard.shardId() + " on node " + nodeAndClient.name + " not 0", indexShard.getOperationsCount(), equalTo(0)); assertThat("index shard counter on shard " + indexShard.shardId() + " on node " + nodeAndClient.name + " not 0", indexShard.getActiveOperationsCount(), equalTo(0));
} }
} }
} }