Live primary-replica resync (no rollback) (#24841)

Adds a replication task that streams all operations from the primary's global checkpoint to all replicas.
This commit is contained in:
Yannick Welsch 2017-06-22 13:35:34 +02:00 committed by GitHub
parent 44e9c0b947
commit e41eae9f05
19 changed files with 1071 additions and 75 deletions

View File

@ -531,38 +531,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
}
/** Syncs operation result to the translog or throws a shard not available failure */
private static Translog.Location syncOperationResultOrThrow(final Engine.Result operationResult,
final Translog.Location currentLocation) throws Exception {
final Translog.Location location;
if (operationResult.hasFailure()) {
// check if any transient write operation failures should be bubbled up
Exception failure = operationResult.getFailure();
assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
if (!TransportActions.isShardNotAvailableException(failure)) {
throw failure;
} else {
location = currentLocation;
}
} else {
location = locationToSync(currentLocation, operationResult.getTranslogLocation());
}
return location;
}
private static Translog.Location locationToSync(Translog.Location current,
Translog.Location next) {
/* here we are moving forward in the translog with each operation. Under the hood this might
* cross translog files which is ok since from the user perspective the translog is like a
* tape where only the highest location needs to be fsynced in order to sync all previous
* locations even though they are not in the same file. When the translog rolls over files
* the previous file is fsynced on after closing if needed.*/
assert next != null : "next operation can't be null";
assert current == null || current.compareTo(next) < 0 :
"translog locations are not increasing";
return next;
}
/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatePerformer mappingUpdater) throws Exception {

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.resync;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.List;
public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {
private List<Translog.Operation> operations;
ResyncReplicationRequest() {
super();
}
public ResyncReplicationRequest(ShardId shardId, List<Translog.Operation> operations) {
super(shardId);
this.operations = operations;
}
public List<Translog.Operation> getOperations() {
return operations;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
operations = in.readList(Translog.Operation::readType);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(operations);
}
@Override
public String toString() {
return "TransportResyncReplicationAction.Request{" +
"shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", ops=" + operations.size() +
"}";
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.resync;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
public final class ResyncReplicationResponse extends ReplicationResponse implements WriteResponse {
@Override
public void setForcedRefresh(boolean forcedRefresh) {
// ignore
}
}

View File

@ -0,0 +1,175 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.resync;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.util.function.Supplier;
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
public static String ACTION_NAME = "indices:admin/seq_no/resync";
@Inject
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.BULK);
}
@Override
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
// we should never reject resync because of thread pool capacity on primary
transportService.registerRequestHandler(transportPrimaryAction,
() -> new ConcreteShardRequest<>(request),
executor, true, true,
new PrimaryOperationTransportHandler());
transportService.registerRequestHandler(transportReplicaAction,
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());
}
@Override
protected ResyncReplicationResponse newResponseInstance() {
return new ResyncReplicationResponse();
}
@Override
protected ReplicationOperation.Replicas newReplicasProxy() {
// We treat the resync as best-effort for now and don't mark unavailable shard copies as stale.
return new ReplicasProxy();
}
@Override
protected void sendReplicaRequest(
final ConcreteReplicaRequest<ResyncReplicationRequest> replicaRequest,
final DiscoveryNode node,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
}
}
@Override
protected WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> shardOperationOnPrimary(
ResyncReplicationRequest request, IndexShard primary) throws Exception {
final ResyncReplicationRequest replicaRequest = performOnPrimary(request, primary);
return new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger);
}
public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request, IndexShard primary) {
return request;
}
@Override
protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult(request, location, null, replica, logger);
}
public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (Translog.Operation operation : request.getOperations()) {
try {
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA,
update -> {
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
"Mappings are not available on the replica yet, triggered update: " + update);
});
location = syncOperationResultOrThrow(operationResult, location);
} catch (Exception e) {
// if its not a failure to be ignored, let it bubble up
if (!TransportActions.isShardNotAvailableException(e)) {
throw e;
}
}
}
return location;
}
@Override
public void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId,
ActionListener<ResyncReplicationResponse> listener) {
// skip reroute phase
transportService.sendChildRequest(
clusterService.localNode(),
transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId),
parentTask,
transportOptions,
new TransportResponseHandler<ResyncReplicationResponse>() {
@Override
public ResyncReplicationResponse newInstance() {
return newResponseInstance();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(ResyncReplicationResponse response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
final Throwable cause = exp.unwrapCause();
if (TransportActions.isShardNotAvailableException(cause)) {
logger.trace("primary became unavailable during resync, ignoring", exp);
} else {
listener.onFailure(exp);
}
}
});
}
}

View File

@ -95,17 +95,17 @@ public abstract class TransportReplicationAction<
Response extends ReplicationResponse
> extends TransportAction<Request, Response> {
private final TransportService transportService;
protected final TransportService transportService;
protected final ClusterService clusterService;
protected final ShardStateAction shardStateAction;
private final IndicesService indicesService;
private final TransportRequestOptions transportOptions;
private final String executor;
protected final IndicesService indicesService;
protected final TransportRequestOptions transportOptions;
protected final String executor;
// package private for testing
private final String transportReplicaAction;
private final String transportPrimaryAction;
private final ReplicationOperation.Replicas replicasProxy;
protected final String transportReplicaAction;
protected final String transportPrimaryAction;
protected final ReplicationOperation.Replicas replicasProxy;
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
@ -122,6 +122,15 @@ public abstract class TransportReplicationAction<
this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);
this.transportOptions = transportOptions();
this.replicasProxy = newReplicasProxy();
}
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
new PrimaryOperationTransportHandler());
@ -130,10 +139,6 @@ public abstract class TransportReplicationAction<
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());
this.transportOptions = transportOptions();
this.replicasProxy = newReplicasProxy();
}
@Override
@ -217,7 +222,12 @@ public abstract class TransportReplicationAction<
|| TransportActions.isShardNotAvailableException(e);
}
class OperationTransportHandler implements TransportRequestHandler<Request> {
protected class OperationTransportHandler implements TransportRequestHandler<Request> {
public OperationTransportHandler() {
}
@Override
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
execute(task, request, new ActionListener<Response>() {
@ -250,7 +260,12 @@ public abstract class TransportReplicationAction<
}
}
class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
protected class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
public PrimaryOperationTransportHandler() {
}
@Override
public void messageReceived(final ConcreteShardRequest<Request> request, final TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
@ -314,7 +329,6 @@ public abstract class TransportReplicationAction<
});
} else {
setPhase(replicationTask, "primary");
final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
createReplicatedOperation(request,
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
@ -437,7 +451,7 @@ public abstract class TransportReplicationAction<
}
}
class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> {
public class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> {
@Override
public void messageReceived(
@ -1049,7 +1063,11 @@ public abstract class TransportReplicationAction<
* shards. It also encapsulates the logic required for failing the replica
* if deemed necessary as well as marking it as stale when needed.
*/
class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
protected class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
public ReplicasProxy() {
}
@Override
public void performOn(
@ -1112,13 +1130,13 @@ public abstract class TransportReplicationAction<
private R request;
ConcreteShardRequest(Supplier<R> requestSupplier) {
public ConcreteShardRequest(Supplier<R> requestSupplier) {
request = requestSupplier.get();
// null now, but will be populated by reading from the streams
targetAllocationID = null;
}
ConcreteShardRequest(R request, String targetAllocationID) {
public ConcreteShardRequest(R request, String targetAllocationID) {
Objects.requireNonNull(request);
Objects.requireNonNull(targetAllocationID);
this.request = request;

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
@ -32,6 +33,11 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
@ -43,6 +49,7 @@ import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -67,6 +74,37 @@ public abstract class TransportWriteAction<
indexNameExpressionResolver, request, replicaRequest, executor);
}
/** Syncs operation result to the translog or throws a shard not available failure */
protected static Location syncOperationResultOrThrow(final Engine.Result operationResult,
final Location currentLocation) throws Exception {
final Location location;
if (operationResult.hasFailure()) {
// check if any transient write operation failures should be bubbled up
Exception failure = operationResult.getFailure();
assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
if (!TransportActions.isShardNotAvailableException(failure)) {
throw failure;
} else {
location = currentLocation;
}
} else {
location = locationToSync(currentLocation, operationResult.getTranslogLocation());
}
return location;
}
protected static Location locationToSync(Location current, Location next) {
/* here we are moving forward in the translog with each operation. Under the hood this might
* cross translog files which is ok since from the user perspective the translog is like a
* tape where only the highest location needs to be fsynced in order to sync all previous
* locations even though they are not in the same file. When the translog rolls over files
* the previous file is fsynced on after closing if needed.*/
assert next != null : "next operation can't be null";
assert current == null || current.compareTo(next) < 0 :
"translog locations are not increasing";
return next;
}
@Override
protected ReplicationOperation.Replicas newReplicasProxy() {
return new WriteActionReplicasProxy();
@ -356,8 +394,8 @@ public abstract class TransportWriteAction<
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
public ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
final Consumer<Exception> onIgnoredFailure) {
private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
final Consumer<Exception> onIgnoredFailure) {
return new ShardStateAction.Listener() {
@Override
public void onSuccess() {

View File

@ -45,6 +45,7 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -101,6 +102,7 @@ import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
@ -344,8 +346,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* Notifies the shard of an increase in the primary term.
*
* @param newPrimaryTerm the new primary term
* @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
*/
public void updatePrimaryTerm(final long newPrimaryTerm) {
@Override
public void updatePrimaryTerm(final long newPrimaryTerm,
CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) {
assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
synchronized (mutex) {
if (newPrimaryTerm != primaryTerm) {
@ -374,6 +379,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* incremented.
*/
final CountDownLatch latch = new CountDownLatch(1);
// to prevent primary relocation handoff while resync is not completed
boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true);
if (resyncStarted == false) {
throw new IllegalStateException("cannot start resync while it's already in progress");
}
indexShardOperationPermits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
@ -381,6 +391,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
latch.await();
try {
getEngine().fillSeqNoGaps(newPrimaryTerm);
primaryReplicaSyncer.accept(IndexShard.this, new ActionListener<ResyncTask>() {
@Override
public void onResponse(ResyncTask resyncTask) {
logger.info("primary-replica resync completed with {} operations",
resyncTask.getResyncedOperations());
boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
assert resyncCompleted : "primary-replica resync finished but was not started";
}
@Override
public void onFailure(Exception e) {
boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
assert resyncCompleted : "primary-replica resync finished but was not started";
if (state == IndexShardState.CLOSED) {
// ignore, shutting down
} else {
failShard("exception during primary-replica resync", e);
}
}
});
} catch (final AlreadyClosedException e) {
// okay, the index was deleted
}
@ -483,6 +513,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean();
public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try {
@ -503,6 +535,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": shard is no longer relocating " + shardRouting);
}
if (primaryReplicaResyncInProgress.get()) {
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
}
changeState(IndexShardState.RELOCATED, reason);
}
});
@ -1087,7 +1123,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
return result;
}
@ -1100,9 +1135,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
while ((operation = snapshot.next()) != null) {
try {
logger.trace("[translog] recover op {}", operation);
applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> {
Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> {
throw new IllegalArgumentException("unexpected mapping update: " + update);
});
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
opsRecovered++;
recoveryState.getTranslog().incrementRecoveredOperations();
} catch (Exception e) {

View File

@ -0,0 +1,391 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.resync.ResyncReplicationRequest;
import org.elasticsearch.action.resync.ResyncReplicationResponse;
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Objects.requireNonNull;
public class PrimaryReplicaSyncer extends AbstractComponent {
private final TaskManager taskManager;
private final SyncAction syncAction;
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);
private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
@Inject
public PrimaryReplicaSyncer(Settings settings, TransportService transportService, TransportResyncReplicationAction syncAction) {
this(settings, transportService.getTaskManager(), syncAction);
}
// for tests
public PrimaryReplicaSyncer(Settings settings, TaskManager taskManager, SyncAction syncAction) {
super(settings);
this.taskManager = taskManager;
this.syncAction = syncAction;
}
void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
if (chunkSize.bytesAsInt() <= 0) {
throw new IllegalArgumentException("chunkSize must be > 0");
}
this.chunkSize = chunkSize;
}
public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) throws IOException {
try (Translog.View view = indexShard.acquireTranslogView()) {
Translog.Snapshot snapshot = view.snapshot();
ShardId shardId = indexShard.shardId();
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
// Also fail the resync early if the shard is shutting down
Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
@Override
public synchronized int totalOperations() {
return snapshot.totalOperations();
}
@Override
public synchronized Translog.Operation next() throws IOException {
if (indexShard.state() != IndexShardState.STARTED) {
assert indexShard.state() != IndexShardState.RELOCATED : "resync should never happen on a relocated shard";
throw new IndexShardNotStartedException(shardId, indexShard.state());
}
return snapshot.next();
}
};
resync(shardId, indexShard.routingEntry().allocationId().getId(), wrappedSnapshot,
indexShard.getGlobalCheckpoint() + 1, listener);
}
}
private void resync(final ShardId shardId, final String primaryAllocationId, final Translog.Snapshot snapshot,
long startingSeqNo, ActionListener<ResyncTask> listener) {
ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
ActionListener<Void> wrappedListener = new ActionListener<Void>() {
@Override
public void onResponse(Void ignore) {
resyncTask.setPhase("finished");
taskManager.unregister(resyncTask);
listener.onResponse(resyncTask);
}
@Override
public void onFailure(Exception e) {
resyncTask.setPhase("finished");
taskManager.unregister(resyncTask);
listener.onFailure(e);
}
};
try {
new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, snapshot, chunkSize.bytesAsInt(),
startingSeqNo, wrappedListener).run();
} catch (Exception e) {
wrappedListener.onFailure(e);
}
}
public interface SyncAction {
void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId,
ActionListener<ResyncReplicationResponse> listener);
}
static class SnapshotSender extends AbstractRunnable implements ActionListener<ResyncReplicationResponse> {
private final Logger logger;
private final SyncAction syncAction;
private final ResyncTask task; // to track progress
private final String primaryAllocationId;
private final ShardId shardId;
private final Translog.Snapshot snapshot;
private final long startingSeqNo;
private final int chunkSizeInBytes;
private final ActionListener<Void> listener;
private final AtomicInteger totalSentOps = new AtomicInteger();
private final AtomicInteger totalSkippedOps = new AtomicInteger();
private AtomicBoolean closed = new AtomicBoolean();
SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId,
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, ActionListener<Void> listener) {
this.logger = logger;
this.syncAction = syncAction;
this.task = task;
this.shardId = shardId;
this.primaryAllocationId = primaryAllocationId;
this.snapshot = snapshot;
this.chunkSizeInBytes = chunkSizeInBytes;
this.startingSeqNo = startingSeqNo;
this.listener = listener;
task.setTotalOperations(snapshot.totalOperations());
}
@Override
public void onResponse(ResyncReplicationResponse response) {
run();
}
@Override
public void onFailure(Exception e) {
if (closed.compareAndSet(false, true)) {
listener.onFailure(e);
}
}
@Override
protected void doRun() throws Exception {
long size = 0;
final List<Translog.Operation> operations = new ArrayList<>();
task.setPhase("collecting_ops");
task.setResyncedOperations(totalSentOps.get());
task.setSkippedOperations(totalSkippedOps.get());
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
final long seqNo = operation.seqNo();
if (startingSeqNo >= 0 &&
(seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) {
totalSkippedOps.incrementAndGet();
continue;
}
operations.add(operation);
size += operation.estimateSize();
totalSentOps.incrementAndGet();
// check if this request is past bytes threshold, and if so, send it off
if (size >= chunkSizeInBytes) {
break;
}
}
if (!operations.isEmpty()) {
task.setPhase("sending_ops");
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations);
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
syncAction.sync(request, task, primaryAllocationId, this);
} else if (closed.compareAndSet(false, true)) {
logger.trace("{} resync completed (total sent: [{}], skipped: [{}])", shardId, totalSentOps.get(), totalSkippedOps.get());
listener.onResponse(null);
}
}
}
public static class ResyncRequest extends ActionRequest {
private final ShardId shardId;
private final String allocationId;
public ResyncRequest(ShardId shardId, String allocationId) {
this.shardId = shardId;
this.allocationId = allocationId;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new ResyncTask(id, type, action, getDescription(), parentTaskId);
}
@Override
public String getDescription() {
return toString();
}
@Override
public String toString() {
return "ResyncRequest{ " + shardId + ", " + allocationId + " }";
}
@Override
public ActionRequestValidationException validate() {
return null;
}
}
public static class ResyncTask extends Task {
private volatile String phase = "starting";
private volatile int totalOperations;
private volatile int resyncedOperations;
private volatile int skippedOperations;
public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId) {
super(id, type, action, description, parentTaskId);
}
/**
* Set the current phase of the task.
*/
public void setPhase(String phase) {
this.phase = phase;
}
/**
* Get the current phase of the task.
*/
public String getPhase() {
return phase;
}
/**
* total number of translog operations that were captured by translog snapshot
*/
public int getTotalOperations() {
return totalOperations;
}
public void setTotalOperations(int totalOperations) {
this.totalOperations = totalOperations;
}
/**
* number of operations that have been successfully replicated
*/
public int getResyncedOperations() {
return resyncedOperations;
}
public void setResyncedOperations(int resyncedOperations) {
this.resyncedOperations = resyncedOperations;
}
/**
* number of translog operations that have been skipped
*/
public int getSkippedOperations() {
return skippedOperations;
}
public void setSkippedOperations(int skippedOperations) {
this.skippedOperations = skippedOperations;
}
@Override
public ResyncTask.Status getStatus() {
return new ResyncTask.Status(phase, totalOperations, resyncedOperations, skippedOperations);
}
public static class Status implements Task.Status {
public static final String NAME = "resync";
private final String phase;
private final int totalOperations;
private final int resyncedOperations;
private final int skippedOperations;
public Status(StreamInput in) throws IOException {
phase = in.readString();
totalOperations = in.readVInt();
resyncedOperations = in.readVInt();
skippedOperations = in.readVInt();
}
public Status(String phase, int totalOperations, int resyncedOperations, int skippedOperations) {
this.phase = requireNonNull(phase, "Phase cannot be null");
this.totalOperations = totalOperations;
this.resyncedOperations = resyncedOperations;
this.skippedOperations = skippedOperations;
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("phase", phase);
builder.field("totalOperations", totalOperations);
builder.field("resyncedOperations", resyncedOperations);
builder.field("skippedOperations", skippedOperations);
builder.endObject();
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(phase);
out.writeVLong(totalOperations);
out.writeVLong(resyncedOperations);
out.writeVLong(skippedOperations);
}
@Override
public String toString() {
return Strings.toString(this);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
if (totalOperations != status.totalOperations) return false;
if (resyncedOperations != status.resyncedOperations) return false;
if (skippedOperations != status.skippedOperations) return false;
return phase.equals(status.phase);
}
@Override
public int hashCode() {
int result = phase.hashCode();
result = 31 * result + totalOperations;
result = 31 * result + resyncedOperations;
result = 31 * result + skippedOperations;
return result;
}
}
}
}

View File

@ -22,6 +22,8 @@ package org.elasticsearch.indices;
import org.elasticsearch.action.admin.indices.rollover.Condition;
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.common.geo.ShapesAvailability;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
@ -165,6 +167,8 @@ public class IndicesModule extends AbstractModule {
bind(SyncedFlushService.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(GlobalCheckpointSyncAction.class).asEagerSingleton();
bind(TransportResyncReplicationAction.class).asEagerSingleton();
bind(PrimaryReplicaSyncer.class).asEagerSingleton();
}
/**

View File

@ -25,6 +25,7 @@ import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
@ -40,6 +41,7 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@ -59,6 +61,8 @@ import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService;
@ -83,6 +87,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -112,6 +117,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final boolean sendRefreshMapping;
private final List<IndexEventListener> buildInIndexListener;
private final PrimaryReplicaSyncer primaryReplicaSyncer;
@Inject
public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
@ -121,11 +127,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
RepositoriesService repositoriesService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService,
GlobalCheckpointSyncAction globalCheckpointSyncAction) {
GlobalCheckpointSyncAction globalCheckpointSyncAction,
PrimaryReplicaSyncer primaryReplicaSyncer) {
this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
clusterService, threadPool, recoveryTargetService, shardStateAction,
nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService,
snapshotShardsService, globalCheckpointSyncAction);
snapshotShardsService, globalCheckpointSyncAction, primaryReplicaSyncer);
}
// for tests
@ -138,7 +145,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
RepositoriesService repositoriesService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService,
GlobalCheckpointSyncAction globalCheckpointSyncAction) {
GlobalCheckpointSyncAction globalCheckpointSyncAction,
PrimaryReplicaSyncer primaryReplicaSyncer) {
super(settings);
this.buildInIndexListener =
Arrays.asList(
@ -155,6 +163,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
this.shardStateAction = shardStateAction;
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
this.repositoriesService = repositoriesService;
this.primaryReplicaSyncer = primaryReplicaSyncer;
this.sendRefreshMapping = this.settings.getAsBoolean("indices.cluster.send_refresh_mapping", true);
}
@ -560,7 +569,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
final Set<String> initializingIds =
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()));
shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()),
primaryReplicaSyncer::resync);
shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
}
} catch (Exception e) {
@ -741,8 +751,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
* Update the primary term. This method should only be invoked on primary shards.
*
* @param primaryTerm the new primary term
* @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
*/
void updatePrimaryTerm(long primaryTerm);
void updatePrimaryTerm(long primaryTerm,
CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer);
/**
* Notifies the service of the current allocation ids in the cluster state.

View File

@ -41,6 +41,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
import org.elasticsearch.index.shard.IndexShardState;
@ -387,9 +388,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
for (Translog.Operation operation : operations) {
indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY, update -> {
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY, update -> {
throw new MapperException("mapping updates are not allowed [" + operation + "]");
});
assert result.hasFailure() == false : "unexpected failure while replicating translog entry: " + result.getFailure();
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
}
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
translog.incrementRecoveredOperations(operations.size());

View File

@ -524,6 +524,19 @@ public class TransportService extends AbstractLifecycleComponent {
}
}
public final <T extends TransportResponse> void sendChildRequest(final DiscoveryNode node, final String action,
final TransportRequest request, final Task parentTask,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
try {
Transport.Connection connection = getConnection(node);
sendChildRequest(connection, action, request, parentTask, options, handler);
} catch (NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
}
}
public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
final TransportRequest request, final Task parentTask,
final TransportResponseHandler<T> handler) {

View File

@ -121,7 +121,8 @@ public class TransportBulkActionIngestTests extends ESTestCase {
class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> {
TestSingleItemBulkWriteAction(TestTransportBulkAction bulkAction) {
super(Settings.EMPTY, IndexAction.NAME, transportService, TransportBulkActionIngestTests.this.clusterService,
super(Settings.EMPTY, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
TransportBulkActionIngestTests.this.clusterService,
null, null, null, new ActionFilters(Collections.emptySet()), null,
IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX, bulkAction, null);
}

View File

@ -31,6 +31,9 @@ import org.elasticsearch.action.bulk.BulkShardResponse;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkActionTests;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.resync.ResyncReplicationRequest;
import org.elasticsearch.action.resync.ResyncReplicationResponse;
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
@ -56,13 +59,14 @@ import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
import java.util.ArrayList;
@ -124,6 +128,14 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
private final AtomicInteger replicaId = new AtomicInteger();
private final AtomicInteger docId = new AtomicInteger();
boolean closed = false;
private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY),
(request, parentTask, primaryAllocationId, listener) -> {
try {
new ResyncAction(request, listener, ReplicationGroup.this).execute();
} catch (Exception e) {
throw new AssertionError(e);
}
});
ReplicationGroup(final IndexMetaData indexMetaData) throws IOException {
final ShardRouting primaryRouting = this.createShardRouting("s0", true);
@ -254,7 +266,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
/**
* promotes the specific replica as the new primary
*/
public synchronized void promoteReplicaToPrimary(IndexShard replica) throws IOException {
public synchronized Future<PrimaryReplicaSyncer.ResyncTask> promoteReplicaToPrimary(IndexShard replica) throws IOException {
final long newTerm = indexMetaData.primaryTerm(shardId.id()) + 1;
IndexMetaData.Builder newMetaData = IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm);
indexMetaData = newMetaData.build();
@ -262,8 +274,23 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
closeShards(primary);
primary = replica;
primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary());
primary.updatePrimaryTerm(newTerm);
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
primary.updatePrimaryTerm(newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard,
new ActionListener<PrimaryReplicaSyncer.ResyncTask>() {
@Override
public void onResponse(PrimaryReplicaSyncer.ResyncTask resyncTask) {
listener.onResponse(resyncTask);
fut.onResponse(resyncTask);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
fut.onFailure(e);
}
}));
updateAllocationIDsOnPrimary();
return fut;
}
synchronized boolean removeReplica(IndexShard replica) {
@ -625,4 +652,37 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
}
class ResyncAction extends ReplicationAction<ResyncReplicationRequest, ResyncReplicationRequest, ResyncReplicationResponse> {
ResyncAction(ResyncReplicationRequest request, ActionListener<ResyncReplicationResponse> listener, ReplicationGroup replicationGroup) {
super(request, listener, replicationGroup, "resync");
}
@Override
protected PrimaryResult performOnPrimary(IndexShard primary, ResyncReplicationRequest request) throws Exception {
final TransportWriteAction.WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> result =
executeResyncOnPrimary(primary, request);
return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful);
}
@Override
protected void performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
executeResyncOnReplica(replica, request);
}
}
private TransportWriteAction.WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> executeResyncOnPrimary(
IndexShard primary, ResyncReplicationRequest request) throws Exception {
final TransportWriteAction.WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> result =
new TransportWriteAction.WritePrimaryResult<>(TransportResyncReplicationAction.performOnPrimary(request, primary),
new ResyncReplicationResponse(), null, null, primary, logger);
request.primaryTerm(primary.getPrimaryTerm());
TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger);
return result;
}
private void executeResyncOnReplica(IndexShard replica, ResyncReplicationRequest request) throws Exception {
final Translog.Location location = TransportResyncReplicationAction.performOnReplica(request, replica);
TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger);
}
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
@ -55,6 +56,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
@ -201,6 +203,41 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
}
@TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.action.resync:TRACE")
public void testResyncAfterPrimaryPromotion() throws Exception {
// TODO: check translog trimming functionality once it's implemented
try (ReplicationGroup shards = createGroup(2)) {
shards.startAll();
int initialDocs = shards.indexDocs(randomInt(10));
boolean syncedGlobalCheckPoint = randomBoolean();
if (syncedGlobalCheckPoint) {
shards.syncGlobalCheckpoint();
}
final IndexShard oldPrimary = shards.getPrimary();
final IndexShard newPrimary = shards.getReplicas().get(0);
final IndexShard otherReplica = shards.getReplicas().get(1);
// simulate docs that were inflight when primary failed
final int extraDocs = randomIntBetween(0, 5);
logger.info("--> indexing {} extra docs", extraDocs);
for (int i = 0; i < extraDocs; i++) {
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_" + i)
.source("{}", XContentType.JSON);
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
indexOnReplica(bulkShardRequest, newPrimary);
}
logger.info("--> resyncing replicas");
PrimaryReplicaSyncer.ResyncTask task = shards.promoteReplicaToPrimary(newPrimary).get();
if (syncedGlobalCheckPoint) {
assertEquals(extraDocs, task.getResyncedOperations());
} else {
assertThat(task.getResyncedOperations(), greaterThanOrEqualTo(extraDocs));
}
shards.assertAllEqual(initialDocs + extraDocs);
}
}
@TestLogging(
"_root:DEBUG,"
+ "org.elasticsearch.action.bulk:TRACE,"

View File

@ -61,7 +61,6 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -133,9 +132,6 @@ import static java.util.Collections.emptySet;
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.VersionType.EXTERNAL;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.Matchers.containsString;
@ -340,7 +336,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateRoutingEntry(primaryRouting);
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {});
final int delayedOperations = scaledRandomIntBetween(1, 64);
final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations);
@ -431,7 +427,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateRoutingEntry(primaryRouting);
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {});
/*
* This operation completing means that the delay operation executed as part of increasing the primary term has completed and the
@ -473,7 +469,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
indexShard.updateRoutingEntry(primaryRouting);
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {});
} else {
indexShard = newStartedShard(true);
}

View File

@ -0,0 +1,139 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.action.resync.ResyncReplicationResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.tasks.TaskManager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
public void testSyncerSendsOffCorrectDocuments() throws Exception {
IndexShard shard = newStartedShard(true);
TaskManager taskManager = new TaskManager(Settings.EMPTY);
AtomicBoolean syncActionCalled = new AtomicBoolean();
PrimaryReplicaSyncer.SyncAction syncAction =
(request, parentTask, allocationId, listener) -> {
logger.info("Sending off {} operations", request.getOperations().size());
syncActionCalled.set(true);
assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class));
listener.onResponse(new ResyncReplicationResponse());
};
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, taskManager, syncAction);
syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 100)));
int numDocs = randomInt(10);
for (int i = 0; i < numDocs; i++) {
indexDoc(shard, "test", Integer.toString(i));
}
long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0;
boolean syncNeeded = numDocs > 0 && globalCheckPoint < numDocs - 1;
String allocationId = shard.routingEntry().allocationId().getId();
shard.updateAllocationIdsFromMaster(Collections.singleton(allocationId), Collections.emptySet());
shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint);
assertEquals(globalCheckPoint, shard.getGlobalCheckpoint());
logger.info("Total ops: {}, global checkpoint: {}", numDocs, globalCheckPoint);
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
syncer.resync(shard, fut);
fut.get();
if (syncNeeded) {
assertTrue("Sync action was not called", syncActionCalled.get());
}
assertEquals(numDocs, fut.get().getTotalOperations());
if (syncNeeded) {
long skippedOps = globalCheckPoint + 1; // everything up to global checkpoint included
assertEquals(skippedOps, fut.get().getSkippedOperations());
assertEquals(numDocs - skippedOps, fut.get().getResyncedOperations());
} else {
assertEquals(0, fut.get().getSkippedOperations());
assertEquals(0, fut.get().getResyncedOperations());
}
closeShards(shard);
}
public void testStatusSerialization() throws IOException {
PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(randomAlphaOfLength(10),
randomIntBetween(0, 1000), randomIntBetween(0, 1000), randomIntBetween(0, 1000));
final BytesStreamOutput out = new BytesStreamOutput();
status.writeTo(out);
final ByteBufferStreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(out.bytes().toBytesRef().bytes));
PrimaryReplicaSyncer.ResyncTask.Status serializedStatus = new PrimaryReplicaSyncer.ResyncTask.Status(in);
assertEquals(status, serializedStatus);
}
public void testStatusEquals() throws IOException {
PrimaryReplicaSyncer.ResyncTask task = new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null);
task.setPhase(randomAlphaOfLength(10));
task.setResyncedOperations(randomIntBetween(0, 1000));
task.setTotalOperations(randomIntBetween(0, 1000));
task.setSkippedOperations(randomIntBetween(0, 1000));
PrimaryReplicaSyncer.ResyncTask.Status status = task.getStatus();
PrimaryReplicaSyncer.ResyncTask.Status sameStatus = task.getStatus();
assertNotSame(status, sameStatus);
assertEquals(status, sameStatus);
assertEquals(status.hashCode(), sameStatus.hashCode());
switch (randomInt(3)) {
case 0: task.setPhase("otherPhase"); break;
case 1: task.setResyncedOperations(task.getResyncedOperations() + 1); break;
case 2: task.setSkippedOperations(task.getSkippedOperations() + 1); break;
case 3: task.setTotalOperations(task.getTotalOperations() + 1); break;
}
PrimaryReplicaSyncer.ResyncTask.Status differentStatus = task.getStatus();
assertNotEquals(status, differentStatus);
}
public void testStatusReportsCorrectNumbers() throws IOException {
PrimaryReplicaSyncer.ResyncTask task = new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null);
task.setPhase(randomAlphaOfLength(10));
task.setResyncedOperations(randomIntBetween(0, 1000));
task.setTotalOperations(randomIntBetween(0, 1000));
task.setSkippedOperations(randomIntBetween(0, 1000));
PrimaryReplicaSyncer.ResyncTask.Status status = task.getStatus();
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
status.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
String jsonString = jsonBuilder.string();
assertThat(jsonString, containsString("\"phase\":\"" + task.getPhase() + "\""));
assertThat(jsonString, containsString("\"totalOperations\":" + task.getTotalOperations()));
assertThat(jsonString, containsString("\"resyncedOperations\":" + task.getResyncedOperations()));
assertThat(jsonString, containsString("\"skippedOperations\":" + task.getSkippedOperations()));
}
}

View File

@ -19,11 +19,13 @@
package org.elasticsearch.indices.cluster;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -33,6 +35,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndex;
@ -363,8 +366,9 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
}
@Override
public void updatePrimaryTerm(long primaryTerm) {
term = primaryTerm;
public void updatePrimaryTerm(final long newPrimaryTerm,
CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) {
term = newPrimaryTerm;
}
@Override

View File

@ -48,6 +48,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.threadpool.TestThreadPool;
@ -407,6 +408,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
final PeerRecoveryTargetService recoveryTargetService = new PeerRecoveryTargetService(settings, threadPool,
transportService, null, clusterService);
final ShardStateAction shardStateAction = mock(ShardStateAction.class);
final PrimaryReplicaSyncer primaryReplicaSyncer = mock(PrimaryReplicaSyncer.class);
return new IndicesClusterStateService(
settings,
indicesService,
@ -420,7 +422,8 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
null,
null,
null,
null);
null,
primaryReplicaSyncer);
}
private class RecordingIndicesService extends MockIndicesService {