[CCR] Initial replication group based tests (#32024)

Tests shard follow task in the context of a leader and follower ReplicationGroup,
in order to test how the shard follow logic reacts to certain shard related
failure scenarios.

More tests will need to be added, but this indicates what changes need to be made
to have these tests.

Relates to #30102
This commit is contained in:
Martijn van Groningen 2018-07-17 17:39:49 +02:00 committed by GitHub
parent 006c79a80d
commit d88c76e02b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 230 additions and 16 deletions

View File

@ -98,7 +98,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
protected final Index index = new Index("test", "uuid"); protected final Index index = new Index("test", "uuid");
private final ShardId shardId = new ShardId(index, 0); private final ShardId shardId = new ShardId(index, 0);
private final Map<String, String> indexMapping = Collections.singletonMap("type", "{ \"type\": {} }"); protected final Map<String, String> indexMapping = Collections.singletonMap("type", "{ \"type\": {} }");
protected ReplicationGroup createGroup(int replicas) throws IOException { protected ReplicationGroup createGroup(int replicas) throws IOException {
return createGroup(replicas, Settings.EMPTY); return createGroup(replicas, Settings.EMPTY);
@ -157,7 +157,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
} }
}); });
ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { protected ReplicationGroup(final IndexMetaData indexMetaData) throws IOException {
final ShardRouting primaryRouting = this.createShardRouting("s0", true); final ShardRouting primaryRouting = this.createShardRouting("s0", true);
primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {}); primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {});
replicas = new CopyOnWriteArrayList<>(); replicas = new CopyOnWriteArrayList<>();
@ -451,7 +451,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
} }
} }
abstract class ReplicationAction<Request extends ReplicationRequest<Request>, protected abstract class ReplicationAction<Request extends ReplicationRequest<Request>,
ReplicaRequest extends ReplicationRequest<ReplicaRequest>, ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Response extends ReplicationResponse> { Response extends ReplicationResponse> {
private final Request request; private final Request request;
@ -459,7 +459,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
private final ReplicationGroup replicationGroup; private final ReplicationGroup replicationGroup;
private final String opType; private final String opType;
ReplicationAction(Request request, ActionListener<Response> listener, ReplicationGroup group, String opType) { protected ReplicationAction(Request request, ActionListener<Response> listener, ReplicationGroup group, String opType) {
this.request = request; this.request = request;
this.listener = listener; this.listener = listener;
this.replicationGroup = group; this.replicationGroup = group;
@ -585,11 +585,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
} }
} }
class PrimaryResult implements ReplicationOperation.PrimaryResult<ReplicaRequest> { protected class PrimaryResult implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
final ReplicaRequest replicaRequest; final ReplicaRequest replicaRequest;
final Response finalResponse; final Response finalResponse;
PrimaryResult(ReplicaRequest replicaRequest, Response finalResponse) { public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponse) {
this.replicaRequest = replicaRequest; this.replicaRequest = replicaRequest;
this.finalResponse = finalResponse; this.finalResponse = finalResponse;
} }

View File

@ -17,6 +17,7 @@ archivesBaseName = 'x-pack-ccr'
integTest.enabled = false integTest.enabled = false
compileJava.options.compilerArgs << "-Xlint:-try" compileJava.options.compilerArgs << "-Xlint:-try"
compileTestJava.options.compilerArgs << "-Xlint:-try"
// Instead we create a separate task to run the // Instead we create a separate task to run the
// tests based on ESIntegTestCase // tests based on ESIntegTestCase

View File

@ -70,13 +70,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo).reversed()); private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo).reversed());
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers, ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, TimeValue idleShardChangesRequestDelay, ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler) {
TimeValue retryTimeout) {
super(id, type, action, description, parentTask, headers); super(id, type, action, description, parentTask, headers);
this.params = params; this.params = params;
this.scheduler = scheduler; this.scheduler = scheduler;
this.retryTimeout = retryTimeout; this.retryTimeout = params.getRetryTimeout();
this.idleShardChangesRequestDelay = idleShardChangesRequestDelay; this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
} }
void start(long followerGlobalCheckpoint) { void start(long followerGlobalCheckpoint) {

View File

@ -90,8 +90,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
Client followerClient = wrapClient(client, params); Client followerClient = wrapClient(client, params);
BiConsumer<TimeValue, Runnable> scheduler = BiConsumer<TimeValue, Runnable> scheduler =
(delay, command) -> threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command); (delay, command) -> threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command);
return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, scheduler) {
scheduler, params.getIdleShardRetryDelay(), params.getRetryTimeout()) {
@Override @Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) { protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {

View File

@ -62,7 +62,8 @@ public class TransportBulkShardOperationsAction
return shardOperationOnPrimary(request.shardId(), request.getOperations(), primary, logger); return shardOperationOnPrimary(request.shardId(), request.getOperations(), primary, logger);
} }
static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary( // public for testing purposes only
public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
final ShardId shardId, final ShardId shardId,
final List<Translog.Operation> sourceOperations, final List<Translog.Operation> sourceOperations,
final IndexShard primary, final IndexShard primary,
@ -115,7 +116,8 @@ public class TransportBulkShardOperationsAction
return new WriteReplicaResult<>(request, location, null, replica, logger); return new WriteReplicaResult<>(request, location, null, replica, logger);
} }
private static Translog.Location applyTranslogOperations( // public for testing purposes only
public static Translog.Location applyTranslogOperations(
final List<Translog.Operation> operations, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException { final List<Translog.Operation> operations, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException {
Translog.Location location = null; Translog.Location location = null;
for (final Translog.Operation operation : operations) { for (final Translog.Operation operation : operations) {

View File

@ -143,8 +143,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
AtomicInteger readCounter = new AtomicInteger(); AtomicInteger readCounter = new AtomicInteger();
AtomicInteger writeCounter = new AtomicInteger(); AtomicInteger writeCounter = new AtomicInteger();
LocalCheckpointTracker tracker = new LocalCheckpointTracker(followGlobalCheckpoint, followGlobalCheckpoint); LocalCheckpointTracker tracker = new LocalCheckpointTracker(followGlobalCheckpoint, followGlobalCheckpoint);
return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) {
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(500)) {
@Override @Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) { protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {

View File

@ -0,0 +1,214 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine.Operation.Origin;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase {
public void testSimpleCcrReplication() throws Exception {
try (ReplicationGroup leaderGroup = createGroup(randomInt(2));
ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) {
leaderGroup.startAll();
int docCount = leaderGroup.appendDocs(randomInt(64));
leaderGroup.assertAllEqual(docCount);
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
shardFollowTask.start(followerGroup.getPrimary().getGlobalCheckpoint());
docCount += leaderGroup.appendDocs(randomInt(128));
leaderGroup.syncGlobalCheckpoint();
leaderGroup.assertAllEqual(docCount);
int expectedCount = docCount;
assertBusy(() -> followerGroup.assertAllEqual(expectedCount));
shardFollowTask.markAsCompleted();
}
}
public void testFailLeaderReplicaShard() throws Exception {
try (ReplicationGroup leaderGroup = createGroup(1 + randomInt(1));
ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) {
leaderGroup.startAll();
followerGroup.startAll();
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
shardFollowTask.start(followerGroup.getPrimary().getGlobalCheckpoint());
int docCount = 256;
leaderGroup.appendDocs(1);
Runnable task = () -> {
try {
leaderGroup.appendDocs(docCount - 1);
leaderGroup.syncGlobalCheckpoint();
} catch (Exception e) {
throw new AssertionError(e);
}
};
Thread thread = new Thread(task);
thread.start();
// Remove and add a new replica
IndexShard luckyReplica = randomFrom(leaderGroup.getReplicas());
leaderGroup.removeReplica(luckyReplica);
luckyReplica.close("stop replica", false);
luckyReplica.store().close();
leaderGroup.addReplica();
leaderGroup.startReplicas(1);
thread.join();
leaderGroup.assertAllEqual(docCount);
assertBusy(() -> followerGroup.assertAllEqual(docCount));
shardFollowTask.markAsCompleted();
}
}
@Override
protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException {
Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000)
.put(settings)
.build();
if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(newSettings)) {
IndexMetaData metaData = buildIndexMetaData(replicas, newSettings, indexMapping);
return new ReplicationGroup(metaData) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
return new FollowingEngineFactory();
}
};
} else {
return super.createGroup(replicas, newSettings);
}
}
private ReplicationGroup createFollowGroup(int replicas) throws IOException {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
return createGroup(replicas, settingsBuilder.build());
}
private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) {
ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0),
new ShardId("leader_index", "", 0), 1024, 1, Long.MAX_VALUE, 1, 10240,
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap());
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
AtomicBoolean stopped = new AtomicBoolean(false);
return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) {
@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
// noop, as mapping updates are not tested
handler.accept(1L);
}
@Override
protected void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, LongConsumer handler,
Consumer<Exception> errorHandler) {
Runnable task = () -> {
BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations);
ActionListener<BulkShardOperationsResponse> listener =
ActionListener.wrap(r -> handler.accept(r.getGlobalCheckpoint()), errorHandler);
new CCRAction(request, listener, followerGroup).execute();
};
threadPool.executor(ThreadPool.Names.GENERIC).execute(task);
}
@Override
protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
Consumer<Exception> errorHandler) {
Runnable task = () -> {
List<IndexShard> indexShards = new ArrayList<>(leaderGroup.getReplicas());
indexShards.add(leaderGroup.getPrimary());
Collections.shuffle(indexShards, random());
Exception exception = null;
for (IndexShard indexShard : indexShards) {
long globalCheckpoint = indexShard.getGlobalCheckpoint();
try {
Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, globalCheckpoint, from,
maxOperationCount, params.getMaxBatchSizeInBytes());
// Hard code index metadata version, this is ok, as mapping updates are not tested here.
handler.accept(new ShardChangesAction.Response(1L, globalCheckpoint, ops));
return;
} catch (Exception e) {
exception = e;
}
}
assert exception != null;
errorHandler.accept(exception);
};
threadPool.executor(ThreadPool.Names.GENERIC).execute(task);
}
@Override
protected boolean isStopped() {
return stopped.get();
}
@Override
public void markAsCompleted() {
stopped.set(true);
}
@Override
public void markAsFailed(Exception e) {
stopped.set(true);
}
};
}
class CCRAction extends ReplicationAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
CCRAction(BulkShardOperationsRequest request, ActionListener<BulkShardOperationsResponse> listener, ReplicationGroup group) {
super(request, listener, group, "ccr");
}
@Override
protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardOperationsRequest request) throws Exception {
TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> result =
TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getOperations(),
primary, logger);
return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful);
}
@Override
protected void performOnReplica(BulkShardOperationsRequest request, IndexShard replica) throws Exception {
TransportBulkShardOperationsAction.applyTranslogOperations(request.getOperations(), replica, Origin.REPLICA);
}
}
}