diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 14c0a7648fd..7e0493961ca 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -98,7 +98,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase protected final Index index = new Index("test", "uuid"); private final ShardId shardId = new ShardId(index, 0); - private final Map indexMapping = Collections.singletonMap("type", "{ \"type\": {} }"); + protected final Map indexMapping = Collections.singletonMap("type", "{ \"type\": {} }"); protected ReplicationGroup createGroup(int replicas) throws IOException { return createGroup(replicas, Settings.EMPTY); @@ -157,7 +157,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } }); - ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { + protected ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {}); replicas = new CopyOnWriteArrayList<>(); @@ -451,7 +451,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } } - abstract class ReplicationAction, + protected abstract class ReplicationAction, ReplicaRequest extends ReplicationRequest, Response extends ReplicationResponse> { private final Request request; @@ -459,7 +459,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase private final ReplicationGroup replicationGroup; private final String opType; - ReplicationAction(Request request, ActionListener listener, ReplicationGroup group, String opType) { + protected ReplicationAction(Request request, ActionListener listener, ReplicationGroup group, String opType) { this.request = request; this.listener = listener; this.replicationGroup = group; @@ -585,11 +585,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } } - class PrimaryResult implements ReplicationOperation.PrimaryResult { + protected class PrimaryResult implements ReplicationOperation.PrimaryResult { final ReplicaRequest replicaRequest; final Response finalResponse; - PrimaryResult(ReplicaRequest replicaRequest, Response finalResponse) { + public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponse) { this.replicaRequest = replicaRequest; this.finalResponse = finalResponse; } diff --git a/x-pack/plugin/ccr/build.gradle b/x-pack/plugin/ccr/build.gradle index 1c2c4539a7c..001007415e0 100644 --- a/x-pack/plugin/ccr/build.gradle +++ b/x-pack/plugin/ccr/build.gradle @@ -17,6 +17,7 @@ archivesBaseName = 'x-pack-ccr' integTest.enabled = false compileJava.options.compilerArgs << "-Xlint:-try" +compileTestJava.options.compilerArgs << "-Xlint:-try" // Instead we create a separate task to run the // tests based on ESIntegTestCase diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 63077097319..97c1c9fcd21 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -70,13 +70,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private final Queue buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo).reversed()); ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers, - ShardFollowTask params, BiConsumer scheduler, TimeValue idleShardChangesRequestDelay, - TimeValue retryTimeout) { + ShardFollowTask params, BiConsumer scheduler) { super(id, type, action, description, parentTask, headers); this.params = params; this.scheduler = scheduler; - this.retryTimeout = retryTimeout; - this.idleShardChangesRequestDelay = idleShardChangesRequestDelay; + this.retryTimeout = params.getRetryTimeout(); + this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay(); } void start(long followerGlobalCheckpoint) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index e80c7a34ee6..a3ad7aa547f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -90,8 +90,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor scheduler = (delay, command) -> threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command); - return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, - scheduler, params.getIdleShardRetryDelay(), params.getRetryTimeout()) { + return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, scheduler) { @Override protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 0770c871356..5b648a4cf8d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -62,7 +62,8 @@ public class TransportBulkShardOperationsAction return shardOperationOnPrimary(request.shardId(), request.getOperations(), primary, logger); } - static WritePrimaryResult shardOperationOnPrimary( + // public for testing purposes only + public static WritePrimaryResult shardOperationOnPrimary( final ShardId shardId, final List sourceOperations, final IndexShard primary, @@ -115,7 +116,8 @@ public class TransportBulkShardOperationsAction return new WriteReplicaResult<>(request, location, null, replica, logger); } - private static Translog.Location applyTranslogOperations( + // public for testing purposes only + public static Translog.Location applyTranslogOperations( final List operations, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException { Translog.Location location = null; for (final Translog.Operation operation : operations) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 213fca4542f..351044cadcc 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -143,8 +143,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { AtomicInteger readCounter = new AtomicInteger(); AtomicInteger writeCounter = new AtomicInteger(); LocalCheckpointTracker tracker = new LocalCheckpointTracker(followGlobalCheckpoint, followGlobalCheckpoint); - return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, - TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(500)) { + return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) { @Override protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java new file mode 100644 index 00000000000..da957e7ee5e --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -0,0 +1,214 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine.Operation.Origin; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.CcrSettings; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; +import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.LongConsumer; + +public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase { + + public void testSimpleCcrReplication() throws Exception { + try (ReplicationGroup leaderGroup = createGroup(randomInt(2)); + ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) { + leaderGroup.startAll(); + int docCount = leaderGroup.appendDocs(randomInt(64)); + leaderGroup.assertAllEqual(docCount); + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + shardFollowTask.start(followerGroup.getPrimary().getGlobalCheckpoint()); + docCount += leaderGroup.appendDocs(randomInt(128)); + leaderGroup.syncGlobalCheckpoint(); + + leaderGroup.assertAllEqual(docCount); + int expectedCount = docCount; + assertBusy(() -> followerGroup.assertAllEqual(expectedCount)); + shardFollowTask.markAsCompleted(); + } + } + + public void testFailLeaderReplicaShard() throws Exception { + try (ReplicationGroup leaderGroup = createGroup(1 + randomInt(1)); + ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) { + leaderGroup.startAll(); + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + shardFollowTask.start(followerGroup.getPrimary().getGlobalCheckpoint()); + int docCount = 256; + leaderGroup.appendDocs(1); + Runnable task = () -> { + try { + leaderGroup.appendDocs(docCount - 1); + leaderGroup.syncGlobalCheckpoint(); + } catch (Exception e) { + throw new AssertionError(e); + } + }; + Thread thread = new Thread(task); + thread.start(); + + // Remove and add a new replica + IndexShard luckyReplica = randomFrom(leaderGroup.getReplicas()); + leaderGroup.removeReplica(luckyReplica); + luckyReplica.close("stop replica", false); + luckyReplica.store().close(); + leaderGroup.addReplica(); + leaderGroup.startReplicas(1); + thread.join(); + + leaderGroup.assertAllEqual(docCount); + assertBusy(() -> followerGroup.assertAllEqual(docCount)); + shardFollowTask.markAsCompleted(); + } + } + + @Override + protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException { + Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000) + .put(settings) + .build(); + if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(newSettings)) { + IndexMetaData metaData = buildIndexMetaData(replicas, newSettings, indexMapping); + return new ReplicationGroup(metaData) { + + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return new FollowingEngineFactory(); + } + }; + } else { + return super.createGroup(replicas, newSettings); + } + } + + private ReplicationGroup createFollowGroup(int replicas) throws IOException { + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + return createGroup(replicas, settingsBuilder.build()); + } + + private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) { + ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), 1024, 1, Long.MAX_VALUE, 1, 10240, + TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap()); + + BiConsumer scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); + AtomicBoolean stopped = new AtomicBoolean(false); + return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) { + @Override + protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { + // noop, as mapping updates are not tested + handler.accept(1L); + } + + @Override + protected void innerSendBulkShardOperationsRequest(List operations, LongConsumer handler, + Consumer errorHandler) { + Runnable task = () -> { + BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations); + ActionListener listener = + ActionListener.wrap(r -> handler.accept(r.getGlobalCheckpoint()), errorHandler); + new CCRAction(request, listener, followerGroup).execute(); + }; + threadPool.executor(ThreadPool.Names.GENERIC).execute(task); + } + + @Override + protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer handler, + Consumer errorHandler) { + Runnable task = () -> { + List indexShards = new ArrayList<>(leaderGroup.getReplicas()); + indexShards.add(leaderGroup.getPrimary()); + Collections.shuffle(indexShards, random()); + + Exception exception = null; + for (IndexShard indexShard : indexShards) { + long globalCheckpoint = indexShard.getGlobalCheckpoint(); + try { + Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, globalCheckpoint, from, + maxOperationCount, params.getMaxBatchSizeInBytes()); + // Hard code index metadata version, this is ok, as mapping updates are not tested here. + handler.accept(new ShardChangesAction.Response(1L, globalCheckpoint, ops)); + return; + } catch (Exception e) { + exception = e; + } + } + assert exception != null; + errorHandler.accept(exception); + }; + threadPool.executor(ThreadPool.Names.GENERIC).execute(task); + } + + @Override + protected boolean isStopped() { + return stopped.get(); + } + + @Override + public void markAsCompleted() { + stopped.set(true); + } + + @Override + public void markAsFailed(Exception e) { + stopped.set(true); + } + + }; + } + + class CCRAction extends ReplicationAction { + + CCRAction(BulkShardOperationsRequest request, ActionListener listener, ReplicationGroup group) { + super(request, listener, group, "ccr"); + } + + @Override + protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardOperationsRequest request) throws Exception { + TransportWriteAction.WritePrimaryResult result = + TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getOperations(), + primary, logger); + return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); + } + + @Override + protected void performOnReplica(BulkShardOperationsRequest request, IndexShard replica) throws Exception { + TransportBulkShardOperationsAction.applyTranslogOperations(request.getOperations(), replica, Origin.REPLICA); + } + } + +}