From d88c76e02b28928003ec7fff439ec73741901703 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 17 Jul 2018 17:39:49 +0200 Subject: [PATCH] [CCR] Initial replication group based tests (#32024) Tests shard follow task in the context of a leader and follower ReplicationGroup, in order to test how the shard follow logic reacts to certain shard related failure scenarios. More tests will need to be added, but this indicates what changes need to be made to have these tests. Relates to #30102 --- .../ESIndexLevelReplicationTestCase.java | 12 +- x-pack/plugin/ccr/build.gradle | 1 + .../xpack/ccr/action/ShardFollowNodeTask.java | 7 +- .../ccr/action/ShardFollowTasksExecutor.java | 3 +- .../TransportBulkShardOperationsAction.java | 6 +- .../ccr/action/ShardFollowNodeTaskTests.java | 3 +- .../ShardFollowTaskReplicationTests.java | 214 ++++++++++++++++++ 7 files changed, 230 insertions(+), 16 deletions(-) create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 14c0a7648fd..7e0493961ca 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -98,7 +98,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase protected final Index index = new Index("test", "uuid"); private final ShardId shardId = new ShardId(index, 0); - private final Map indexMapping = Collections.singletonMap("type", "{ \"type\": {} }"); + protected final Map indexMapping = Collections.singletonMap("type", "{ \"type\": {} }"); protected ReplicationGroup createGroup(int replicas) throws IOException { return createGroup(replicas, Settings.EMPTY); @@ -157,7 +157,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } }); - ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { + protected ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {}); replicas = new CopyOnWriteArrayList<>(); @@ -451,7 +451,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } } - abstract class ReplicationAction, + protected abstract class ReplicationAction, ReplicaRequest extends ReplicationRequest, Response extends ReplicationResponse> { private final Request request; @@ -459,7 +459,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase private final ReplicationGroup replicationGroup; private final String opType; - ReplicationAction(Request request, ActionListener listener, ReplicationGroup group, String opType) { + protected ReplicationAction(Request request, ActionListener listener, ReplicationGroup group, String opType) { this.request = request; this.listener = listener; this.replicationGroup = group; @@ -585,11 +585,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } } - class PrimaryResult implements ReplicationOperation.PrimaryResult { + protected class PrimaryResult implements ReplicationOperation.PrimaryResult { final ReplicaRequest replicaRequest; final Response finalResponse; - PrimaryResult(ReplicaRequest replicaRequest, Response finalResponse) { + public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponse) { this.replicaRequest = replicaRequest; this.finalResponse = finalResponse; } diff --git a/x-pack/plugin/ccr/build.gradle b/x-pack/plugin/ccr/build.gradle index 1c2c4539a7c..001007415e0 100644 --- a/x-pack/plugin/ccr/build.gradle +++ b/x-pack/plugin/ccr/build.gradle @@ -17,6 +17,7 @@ archivesBaseName = 'x-pack-ccr' integTest.enabled = false compileJava.options.compilerArgs << "-Xlint:-try" +compileTestJava.options.compilerArgs << "-Xlint:-try" // Instead we create a separate task to run the // tests based on ESIntegTestCase diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 63077097319..97c1c9fcd21 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -70,13 +70,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private final Queue buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo).reversed()); ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers, - ShardFollowTask params, BiConsumer scheduler, TimeValue idleShardChangesRequestDelay, - TimeValue retryTimeout) { + ShardFollowTask params, BiConsumer scheduler) { super(id, type, action, description, parentTask, headers); this.params = params; this.scheduler = scheduler; - this.retryTimeout = retryTimeout; - this.idleShardChangesRequestDelay = idleShardChangesRequestDelay; + this.retryTimeout = params.getRetryTimeout(); + this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay(); } void start(long followerGlobalCheckpoint) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index e80c7a34ee6..a3ad7aa547f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -90,8 +90,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor scheduler = (delay, command) -> threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command); - return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, - scheduler, params.getIdleShardRetryDelay(), params.getRetryTimeout()) { + return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, scheduler) { @Override protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 0770c871356..5b648a4cf8d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -62,7 +62,8 @@ public class TransportBulkShardOperationsAction return shardOperationOnPrimary(request.shardId(), request.getOperations(), primary, logger); } - static WritePrimaryResult shardOperationOnPrimary( + // public for testing purposes only + public static WritePrimaryResult shardOperationOnPrimary( final ShardId shardId, final List sourceOperations, final IndexShard primary, @@ -115,7 +116,8 @@ public class TransportBulkShardOperationsAction return new WriteReplicaResult<>(request, location, null, replica, logger); } - private static Translog.Location applyTranslogOperations( + // public for testing purposes only + public static Translog.Location applyTranslogOperations( final List operations, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException { Translog.Location location = null; for (final Translog.Operation operation : operations) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 213fca4542f..351044cadcc 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -143,8 +143,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { AtomicInteger readCounter = new AtomicInteger(); AtomicInteger writeCounter = new AtomicInteger(); LocalCheckpointTracker tracker = new LocalCheckpointTracker(followGlobalCheckpoint, followGlobalCheckpoint); - return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, - TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(500)) { + return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) { @Override protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java new file mode 100644 index 00000000000..da957e7ee5e --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -0,0 +1,214 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine.Operation.Origin; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.CcrSettings; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; +import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.LongConsumer; + +public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase { + + public void testSimpleCcrReplication() throws Exception { + try (ReplicationGroup leaderGroup = createGroup(randomInt(2)); + ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) { + leaderGroup.startAll(); + int docCount = leaderGroup.appendDocs(randomInt(64)); + leaderGroup.assertAllEqual(docCount); + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + shardFollowTask.start(followerGroup.getPrimary().getGlobalCheckpoint()); + docCount += leaderGroup.appendDocs(randomInt(128)); + leaderGroup.syncGlobalCheckpoint(); + + leaderGroup.assertAllEqual(docCount); + int expectedCount = docCount; + assertBusy(() -> followerGroup.assertAllEqual(expectedCount)); + shardFollowTask.markAsCompleted(); + } + } + + public void testFailLeaderReplicaShard() throws Exception { + try (ReplicationGroup leaderGroup = createGroup(1 + randomInt(1)); + ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) { + leaderGroup.startAll(); + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + shardFollowTask.start(followerGroup.getPrimary().getGlobalCheckpoint()); + int docCount = 256; + leaderGroup.appendDocs(1); + Runnable task = () -> { + try { + leaderGroup.appendDocs(docCount - 1); + leaderGroup.syncGlobalCheckpoint(); + } catch (Exception e) { + throw new AssertionError(e); + } + }; + Thread thread = new Thread(task); + thread.start(); + + // Remove and add a new replica + IndexShard luckyReplica = randomFrom(leaderGroup.getReplicas()); + leaderGroup.removeReplica(luckyReplica); + luckyReplica.close("stop replica", false); + luckyReplica.store().close(); + leaderGroup.addReplica(); + leaderGroup.startReplicas(1); + thread.join(); + + leaderGroup.assertAllEqual(docCount); + assertBusy(() -> followerGroup.assertAllEqual(docCount)); + shardFollowTask.markAsCompleted(); + } + } + + @Override + protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException { + Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000) + .put(settings) + .build(); + if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(newSettings)) { + IndexMetaData metaData = buildIndexMetaData(replicas, newSettings, indexMapping); + return new ReplicationGroup(metaData) { + + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return new FollowingEngineFactory(); + } + }; + } else { + return super.createGroup(replicas, newSettings); + } + } + + private ReplicationGroup createFollowGroup(int replicas) throws IOException { + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + return createGroup(replicas, settingsBuilder.build()); + } + + private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) { + ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), 1024, 1, Long.MAX_VALUE, 1, 10240, + TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap()); + + BiConsumer scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); + AtomicBoolean stopped = new AtomicBoolean(false); + return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) { + @Override + protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { + // noop, as mapping updates are not tested + handler.accept(1L); + } + + @Override + protected void innerSendBulkShardOperationsRequest(List operations, LongConsumer handler, + Consumer errorHandler) { + Runnable task = () -> { + BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations); + ActionListener listener = + ActionListener.wrap(r -> handler.accept(r.getGlobalCheckpoint()), errorHandler); + new CCRAction(request, listener, followerGroup).execute(); + }; + threadPool.executor(ThreadPool.Names.GENERIC).execute(task); + } + + @Override + protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer handler, + Consumer errorHandler) { + Runnable task = () -> { + List indexShards = new ArrayList<>(leaderGroup.getReplicas()); + indexShards.add(leaderGroup.getPrimary()); + Collections.shuffle(indexShards, random()); + + Exception exception = null; + for (IndexShard indexShard : indexShards) { + long globalCheckpoint = indexShard.getGlobalCheckpoint(); + try { + Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, globalCheckpoint, from, + maxOperationCount, params.getMaxBatchSizeInBytes()); + // Hard code index metadata version, this is ok, as mapping updates are not tested here. + handler.accept(new ShardChangesAction.Response(1L, globalCheckpoint, ops)); + return; + } catch (Exception e) { + exception = e; + } + } + assert exception != null; + errorHandler.accept(exception); + }; + threadPool.executor(ThreadPool.Names.GENERIC).execute(task); + } + + @Override + protected boolean isStopped() { + return stopped.get(); + } + + @Override + public void markAsCompleted() { + stopped.set(true); + } + + @Override + public void markAsFailed(Exception e) { + stopped.set(true); + } + + }; + } + + class CCRAction extends ReplicationAction { + + CCRAction(BulkShardOperationsRequest request, ActionListener listener, ReplicationGroup group) { + super(request, listener, group, "ccr"); + } + + @Override + protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardOperationsRequest request) throws Exception { + TransportWriteAction.WritePrimaryResult result = + TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getOperations(), + primary, logger); + return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); + } + + @Override + protected void performOnReplica(BulkShardOperationsRequest request, IndexShard replica) throws Exception { + TransportBulkShardOperationsAction.applyTranslogOperations(request.getOperations(), replica, Origin.REPLICA); + } + } + +}