diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java index 2619b03e951..c56f6d6a007 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java @@ -58,6 +58,7 @@ public class FollowExistingIndexAction extends Action>() { @Override diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index ace4a1ced18..68fc85b3034 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -30,11 +30,12 @@ public class ShardFollowTask implements PersistentTaskParams { static final ParseField LEADER_SHARD_INDEX_FIELD = new ParseField("leader_shard_index"); static final ParseField LEADER_SHARD_INDEX_UUID_FIELD = new ParseField("leader_shard_index_uuid"); static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard"); - static final ParseField BATCH_SIZE = new ParseField("batch_size"); + public static final ParseField MAX_CHUNK_SIZE = new ParseField("max_chunk_size"); + public static final ParseField NUM_CONCURRENT_CHUNKS = new ParseField("max_concurrent_chunks"); public static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, (a) -> new ShardFollowTask(new ShardId((String) a[0], (String) a[1], (int) a[2]), - new ShardId((String) a[3], (String) a[4], (int) a[5]), (long) a[6])); + new ShardId((String) a[3], (String) a[4], (int) a[5]), (long) a[6], (int) a[7])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_FIELD); @@ -43,23 +44,27 @@ public class ShardFollowTask implements PersistentTaskParams { PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_UUID_FIELD); PARSER.declareInt(ConstructingObjectParser.constructorArg(), LEADER_SHARD_SHARDID_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), BATCH_SIZE); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAX_CHUNK_SIZE); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUM_CONCURRENT_CHUNKS); } private final ShardId followShardId; private final ShardId leaderShardId; - private final long batchSize; + private final long maxChunkSize; + private final int numConcurrentChunks; - public ShardFollowTask(ShardId followShardId, ShardId leaderShardId, long batchSize) { + ShardFollowTask(ShardId followShardId, ShardId leaderShardId, long maxChunkSize, int numConcurrentChunks) { this.followShardId = followShardId; this.leaderShardId = leaderShardId; - this.batchSize = batchSize; + this.maxChunkSize = maxChunkSize; + this.numConcurrentChunks = numConcurrentChunks; } public ShardFollowTask(StreamInput in) throws IOException { this.followShardId = ShardId.readShardId(in); this.leaderShardId = ShardId.readShardId(in); - this.batchSize = in.readVLong(); + this.maxChunkSize = in.readVLong(); + this.numConcurrentChunks = in.readVInt(); } public ShardId getFollowShardId() { @@ -70,8 +75,12 @@ public class ShardFollowTask implements PersistentTaskParams { return leaderShardId; } - public long getBatchSize() { - return batchSize; + public long getMaxChunkSize() { + return maxChunkSize; + } + + public int getNumConcurrentChunks() { + return numConcurrentChunks; } @Override @@ -83,7 +92,8 @@ public class ShardFollowTask implements PersistentTaskParams { public void writeTo(StreamOutput out) throws IOException { followShardId.writeTo(out); leaderShardId.writeTo(out); - out.writeVLong(batchSize); + out.writeVLong(maxChunkSize); + out.writeVInt(numConcurrentChunks); } public static ShardFollowTask fromXContent(XContentParser parser) { @@ -99,7 +109,8 @@ public class ShardFollowTask implements PersistentTaskParams { builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName()); builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID()); builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id()); - builder.field(BATCH_SIZE.getPreferredName(), batchSize); + builder.field(MAX_CHUNK_SIZE.getPreferredName(), maxChunkSize); + builder.field(NUM_CONCURRENT_CHUNKS.getPreferredName(), numConcurrentChunks); return builder.endObject(); } @@ -110,12 +121,13 @@ public class ShardFollowTask implements PersistentTaskParams { ShardFollowTask that = (ShardFollowTask) o; return Objects.equals(followShardId, that.followShardId) && Objects.equals(leaderShardId, that.leaderShardId) && - batchSize == that.batchSize; + maxChunkSize == that.maxChunkSize && + numConcurrentChunks == that.numConcurrentChunks; } @Override public int hashCode() { - return Objects.hash(followShardId, leaderShardId, batchSize); + return Objects.hash(followShardId, leaderShardId, maxChunkSize, numConcurrentChunks); } public String toString() { diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index ddd06264b2d..2d874400e92 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -5,15 +5,21 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.Task; @@ -30,11 +36,15 @@ import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; public class ShardFollowTasksExecutor extends PersistentTasksExecutor { static final long DEFAULT_BATCH_SIZE = 1024; + static final int PROCESSOR_RETRY_LIMIT = 16; + static final int DEFAULT_CONCURRENT_PROCESSORS = 1; private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500); private final Client client; @@ -68,15 +78,18 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { Optional leaderShardStats = Arrays.stream(r.getIndex(leaderShard.getIndexName()).getShards()) .filter(shardStats -> shardStats.getShardRouting().shardId().equals(leaderShard)) @@ -87,7 +100,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor prepare(task, - leaderShard, followerShard, batchSize, leaderGlobalCheckPoint), task::markAsFailed) + task.updatePersistentStatus(newStatus, ActionListener.wrap( + persistentTask -> prepare(task, params, leaderGlobalCheckPoint), task::markAsFailed) ); } else { task.markAsFailed(e); } }; - ChunksCoordinator coordinator = - new ChunksCoordinator(client, ccrExecutor, batchSize, leaderShard, followerShard, handler); + ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, params.getMaxChunkSize(), + params.getNumConcurrentChunks(), leaderShard, followerShard, handler); coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint); - coordinator.processChuck(); + coordinator.start(); } } else { task.markAsFailed(new IllegalArgumentException("Cannot find shard stats for primary leader shard")); @@ -114,8 +127,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor handler; - private final Queue chunks = new ConcurrentLinkedQueue<>(); - ChunksCoordinator(Client client, Executor ccrExecutor, long batchSize, ShardId leaderShard, ShardId followerShard, - Consumer handler) { + private final CountDown countDown; + private final Queue chunks = new ConcurrentLinkedQueue<>(); + private final AtomicReference failureHolder = new AtomicReference<>(); + + ChunksCoordinator(Client client, Executor ccrExecutor, long batchSize, int concurrentProcessors, + ShardId leaderShard, ShardId followerShard, Consumer handler) { this.client = client; this.ccrExecutor = ccrExecutor; this.batchSize = batchSize; + this.concurrentProcessors = concurrentProcessors; this.leaderShard = leaderShard; this.followerShard = followerShard; this.handler = handler; + this.countDown = new CountDown(concurrentProcessors); } void createChucks(long from, long to) { + LOGGER.debug("[{}] Creating chunks for operation range [{}] to [{}]", leaderShard, from, to); for (long i = from; i < to; i += batchSize) { long v2 = i + batchSize < to ? i + batchSize : to; chunks.add(new long[]{i == from ? i : i + 1, v2}); } } - void processChuck() { + void start() { + LOGGER.debug("[{}] Start coordination of [{}] chunks with [{}] concurrent processors", + leaderShard, chunks.size(), concurrentProcessors); + for (int i = 0; i < concurrentProcessors; i++) { + ccrExecutor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + assert e != null; + LOGGER.error(() -> new ParameterizedMessage("[{}] Failure starting processor", leaderShard), e); + postProcessChuck(e); + } + + @Override + protected void doRun() throws Exception { + processNextChunk(); + } + }); + } + } + + void processNextChunk() { long[] chunk = chunks.poll(); if (chunk == null) { - handler.accept(null); + postProcessChuck(null); return; } + LOGGER.debug("[{}] Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); ChunkProcessor processor = new ChunkProcessor(client, ccrExecutor, leaderShard, followerShard, e -> { if (e == null) { - processChuck(); + LOGGER.debug("[{}] Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); + processNextChunk(); } else { - handler.accept(e); + LOGGER.error(() -> new ParameterizedMessage("[{}] Failure processing chunk [{}/{}]", + leaderShard, chunk[0], chunk[1]), e); + postProcessChuck(e); } }); processor.start(chunk[0], chunk[1]); } + void postProcessChuck(Exception e) { + if (failureHolder.compareAndSet(null, e) == false) { + Exception firstFailure = failureHolder.get(); + firstFailure.addSuppressed(e); + } + if (countDown.countDown()) { + handler.accept(failureHolder.get()); + } + } + Queue getChunks() { return chunks; } @@ -187,6 +242,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor handler; + final AtomicInteger retryCounter = new AtomicInteger(0); ChunkProcessor(Client client, Executor ccrExecutor, ShardId leaderShard, ShardId followerShard, Consumer handler) { this.client = client; @@ -209,7 +265,16 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor client.execute(INSTANCE, request, new RestBuilderListener(channel) { @Override diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java index c79a82adbc9..862a0530dbb 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java @@ -5,27 +5,49 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunkProcessor; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunksCoordinator; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; +import java.net.ConnectException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class ChunksCoordinatorTests extends ESTestCase { public void testCreateChunks() { Client client = mock(Client.class); - Executor ccrExecutor = mock(Executor.class); + Executor ccrExecutor = Runnable::run; ShardId leaderShardId = new ShardId("index1", "index1", 0); ShardId followShardId = new ShardId("index2", "index1", 0); - ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, 1024, leaderShardId, followShardId, e -> {}); + ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, 1024, 1, leaderShardId, followShardId, e -> {}); coordinator.createChucks(0, 1024); List result = new ArrayList<>(coordinator.getChunks()); assertThat(result.size(), equalTo(1)); @@ -70,4 +92,235 @@ public class ChunksCoordinatorTests extends ESTestCase { assertThat(result.get(4)[1], equalTo(8196L)); } + public void testCoordinator() throws Exception { + Client client = mock(Client.class); + mockShardChangesApiCall(client); + mockBulkShardOperationsApiCall(client); + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + Consumer handler = e -> assertThat(e, nullValue()); + int concurrentProcessors = randomIntBetween(1, 4); + int batchSize = randomIntBetween(1, 1000); + ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, batchSize, concurrentProcessors, + leaderShardId, followShardId, handler); + + int numberOfOps = randomIntBetween(batchSize, batchSize * 20); + long from = randomInt(1000); + long to = from + numberOfOps; + coordinator.createChucks(from, to); + int expectedNumberOfChunks = numberOfOps / batchSize; + if (numberOfOps % batchSize > 0) { + expectedNumberOfChunks++; + } + assertThat(coordinator.getChunks().size(), equalTo(expectedNumberOfChunks)); + + coordinator.start(); + assertThat(coordinator.getChunks().size(), equalTo(0)); + verify(client, times(expectedNumberOfChunks)).execute(same(ShardChangesAction.INSTANCE), + any(ShardChangesAction.Request.class), any(ActionListener.class)); + verify(client, times(expectedNumberOfChunks)).execute(same(BulkShardOperationsAction.INSTANCE), + any(BulkShardOperationsRequest.class), any(ActionListener.class)); + } + + public void testCoordinator_failure() throws Exception { + Exception expectedException = new RuntimeException("throw me"); + Client client = mock(Client.class); + boolean shardChangesActionApiCallFailed; + if (randomBoolean()) { + shardChangesActionApiCallFailed = true; + doThrow(expectedException).when(client).execute(same(ShardChangesAction.INSTANCE), + any(ShardChangesAction.Request.class), any(ActionListener.class)); + } else { + shardChangesActionApiCallFailed = false; + mockShardChangesApiCall(client); + doThrow(expectedException).when(client).execute(same(BulkShardOperationsAction.INSTANCE), + any(BulkShardOperationsRequest.class), any(ActionListener.class)); + } + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + Consumer handler = e -> { + assertThat(e, notNullValue()); + assertThat(e, sameInstance(expectedException)); + }; + ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, 10, 1, leaderShardId, followShardId, handler); + coordinator.createChucks(0, 20); + assertThat(coordinator.getChunks().size(), equalTo(2)); + + coordinator.start(); + assertThat(coordinator.getChunks().size(), equalTo(1)); + verify(client, times(1)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), + any(ActionListener.class)); + verify(client, times(shardChangesActionApiCallFailed ? 0 : 1)).execute(same(BulkShardOperationsAction.INSTANCE), + any(BulkShardOperationsRequest.class), any(ActionListener.class)); + } + + public void testCoordinator_concurrent() throws Exception { + Client client = mock(Client.class); + mockShardChangesApiCall(client); + mockBulkShardOperationsApiCall(client); + Executor ccrExecutor = command -> new Thread(command).start(); + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + AtomicBoolean calledOnceChecker = new AtomicBoolean(false); + AtomicReference failureHolder = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + Consumer handler = e -> { + if (failureHolder.compareAndSet(null, e) == false) { + // This handler should only be called once, irregardless of the number of concurrent processors + calledOnceChecker.set(true); + } + latch.countDown(); + }; + ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, 1000, 4, leaderShardId, followShardId, handler); + coordinator.createChucks(0, 1000000); + assertThat(coordinator.getChunks().size(), equalTo(1000)); + + coordinator.start(); + latch.await(); + assertThat(coordinator.getChunks().size(), equalTo(0)); + verify(client, times(1000)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), + any(ActionListener.class)); + verify(client, times(1000)).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), + any(ActionListener.class)); + assertThat(calledOnceChecker.get(), is(false)); + } + + public void testChunkProcessor() { + Client client = mock(Client.class); + mockShardChangesApiCall(client); + mockBulkShardOperationsApiCall(client); + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + boolean[] invoked = new boolean[1]; + Exception[] exception = new Exception[1]; + Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; + ChunkProcessor chunkProcessor = new ChunkProcessor(client, ccrExecutor, leaderShardId, followShardId, handler); + chunkProcessor.start(0, 10); + assertThat(invoked[0], is(true)); + assertThat(exception[0], nullValue()); + } + + public void testChunkProcessorRetry() { + Client client = mock(Client.class); + mockBulkShardOperationsApiCall(client); + int testRetryLimit = randomIntBetween(1, ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT); + mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception")); + + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + boolean[] invoked = new boolean[1]; + Exception[] exception = new Exception[1]; + Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; + ChunkProcessor chunkProcessor = new ChunkProcessor(client, ccrExecutor, leaderShardId, followShardId, handler); + chunkProcessor.start(0, 10); + assertThat(invoked[0], is(true)); + assertThat(exception[0], nullValue()); + assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit + 1)); + } + + public void testChunkProcessorRetryTooManyTimes() { + Client client = mock(Client.class); + mockBulkShardOperationsApiCall(client); + int testRetryLimit = ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT + 1; + mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception")); + + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + boolean[] invoked = new boolean[1]; + Exception[] exception = new Exception[1]; + Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; + ChunkProcessor chunkProcessor = new ChunkProcessor(client, ccrExecutor, leaderShardId, followShardId, handler); + chunkProcessor.start(0, 10); + assertThat(invoked[0], is(true)); + assertThat(exception[0], notNullValue()); + assertThat(exception[0].getMessage(), equalTo("retrying failed [16] times, aborting...")); + assertThat(exception[0].getCause().getMessage(), equalTo("connection exception")); + assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit)); + } + + public void testChunkProcessorNoneRetryableError() { + Client client = mock(Client.class); + mockBulkShardOperationsApiCall(client); + mockShardCangesApiCallWithRetry(client, 3, new RuntimeException("unexpected")); + + Executor ccrExecutor = Runnable::run; + ShardId leaderShardId = new ShardId("index1", "index1", 0); + ShardId followShardId = new ShardId("index2", "index1", 0); + + boolean[] invoked = new boolean[1]; + Exception[] exception = new Exception[1]; + Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; + ChunkProcessor chunkProcessor = new ChunkProcessor(client, ccrExecutor, leaderShardId, followShardId, handler); + chunkProcessor.start(0, 10); + assertThat(invoked[0], is(true)); + assertThat(exception[0], notNullValue()); + assertThat(exception[0].getMessage(), equalTo("unexpected")); + assertThat(chunkProcessor.retryCounter.get(), equalTo(0)); + } + + private void mockShardCangesApiCallWithRetry(Client client, int testRetryLimit, Exception e) { + int[] retryCounter = new int[1]; + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 3; + ShardChangesAction.Request request = (ShardChangesAction.Request) args[1]; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) args[2]; + if (retryCounter[0]++ <= testRetryLimit) { + listener.onFailure(e); + } else { + long delta = request.getMaxSeqNo() - request.getMinSeqNo(); + Translog.Operation[] operations = new Translog.Operation[(int) delta]; + for (int i = 0; i < operations.length; i++) { + operations[i] = new Translog.NoOp(request.getMinSeqNo() + i, 1, "test"); + } + ShardChangesAction.Response response = new ShardChangesAction.Response(operations); + listener.onResponse(response); + } + return null; + }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any(ActionListener.class)); + } + + private void mockShardChangesApiCall(Client client) { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 3; + ShardChangesAction.Request request = (ShardChangesAction.Request) args[1]; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) args[2]; + + long delta = request.getMaxSeqNo() - request.getMinSeqNo(); + Translog.Operation[] operations = new Translog.Operation[(int) delta]; + for (int i = 0; i < operations.length; i++) { + operations[i] = new Translog.NoOp(request.getMinSeqNo() + i, 1, "test"); + } + ShardChangesAction.Response response = new ShardChangesAction.Response(operations); + listener.onResponse(response); + return null; + }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any(ActionListener.class)); + } + + private void mockBulkShardOperationsApiCall(Client client) { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 3; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) args[2]; + listener.onResponse(new BulkShardOperationsResponse()); + return null; + }).when(client).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), + any(ActionListener.class)); + } + } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java index a636c2f0ca4..6b7ce376f9f 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -24,7 +24,7 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase