diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 706eae04749..0ef9e557b7c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -151,33 +151,31 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { // updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical updateMapping(0L, leaderMappingVersion -> { synchronized (ShardFollowNodeTask.this) { - currentMappingVersion = leaderMappingVersion; + currentMappingVersion = Math.max(currentMappingVersion, leaderMappingVersion); } updateSettings(leaderSettingsVersion -> { synchronized (ShardFollowNodeTask.this) { - currentSettingsVersion = leaderSettingsVersion; + currentSettingsVersion = Math.max(currentSettingsVersion, leaderSettingsVersion); } - }); - updateAliases(leaderAliasesVersion -> { - synchronized (ShardFollowNodeTask.this) { - currentAliasesVersion = leaderAliasesVersion; - } - }); - synchronized (ShardFollowNodeTask.this) { - LOGGER.info( - "{} following leader shard {}, " + + updateAliases(leaderAliasesVersion -> { + synchronized (ShardFollowNodeTask.this) { + currentAliasesVersion = Math.max(currentAliasesVersion, leaderAliasesVersion); + LOGGER.info( + "{} following leader shard {}, " + "follower global checkpoint=[{}], " + "mapping version=[{}], " + "settings version=[{}], " + "aliases version=[{}]", - params.getFollowShardId(), - params.getLeaderShardId(), - followerGlobalCheckpoint, - currentMappingVersion, - currentSettingsVersion, - currentAliasesVersion); - } - coordinateReads(); + params.getFollowShardId(), + params.getLeaderShardId(), + followerGlobalCheckpoint, + currentMappingVersion, + currentSettingsVersion, + currentAliasesVersion); + } + coordinateReads(); + }); + }); }); } @@ -446,7 +444,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]", params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion); updateMapping(minimumRequiredMappingVersion, mappingVersion -> { - currentMappingVersion = mappingVersion; + synchronized (ShardFollowNodeTask.this) { + currentMappingVersion = Math.max(currentMappingVersion, mappingVersion); + } task.run(); }); } @@ -461,7 +461,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { LOGGER.trace("{} updating settings, settings version [{}] is lower than minimum required settings version [{}]", params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion); updateSettings(settingsVersion -> { - currentSettingsVersion = settingsVersion; + synchronized (ShardFollowNodeTask.this) { + currentSettingsVersion = Math.max(currentSettingsVersion, settingsVersion); + } task.run(); }); } @@ -482,7 +484,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { currentAliasesVersion, minimumRequiredAliasesVersion); updateAliases(aliasesVersion -> { - currentAliasesVersion = aliasesVersion; + synchronized (ShardFollowNodeTask.this) { + currentAliasesVersion = Math.max(currentAliasesVersion, aliasesVersion); + } task.run(); }); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index d540ad8f0b1..f9038075960 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -20,6 +20,8 @@ import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -33,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -1122,6 +1125,117 @@ public class ShardFollowNodeTaskTests extends ESTestCase { } } + public void testUpdateMappingSettingsAndAliasesConcurrently() throws Exception { + final ShardFollowTask followTask = new ShardFollowTask( + "test", + new ShardId("leader_index", "", 0), + new ShardId("follow_index", "", 0), + Integer.MAX_VALUE, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + new ByteSizeValue(Long.MAX_VALUE), + new ByteSizeValue(Long.MAX_VALUE), + Integer.MAX_VALUE, + new ByteSizeValue(Long.MAX_VALUE), + TimeValue.ZERO, + TimeValue.ZERO, + Collections.emptyMap() + ); + final ThreadPool threadPool = new TestThreadPool(getTestClass().getSimpleName()); + final AtomicLong leaderMappingVersion = new AtomicLong(0L); + final AtomicLong followerMappingVersion = new AtomicLong(0L); + final AtomicLong leaderSettingsVersion = new AtomicLong(0L); + final AtomicLong followerSettingsVersion = new AtomicLong(0L); + final AtomicLong leaderAliasesVersion = new AtomicLong(0L); + final AtomicLong followerAliasesVersion = new AtomicLong(0L); + + final Phaser updates = new Phaser(1); + final ShardFollowNodeTask shardFollowNodeTask = new ShardFollowNodeTask( + 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), followTask, scheduler, System::nanoTime) { + @Override + protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer errorHandler) { + updates.register(); + final long fetchedVersion = randomLongBetween(minRequiredMappingVersion, leaderMappingVersion.get()); + followerMappingVersion.updateAndGet(curr -> Math.max(curr, fetchedVersion)); + threadPool.generic().execute(() -> { + handler.accept(fetchedVersion); + updates.arriveAndDeregister(); + }); + } + + @Override + protected void innerUpdateSettings(LongConsumer handler, Consumer errorHandler) { + updates.register(); + final long fetchedVersion = randomLongBetween(0L, leaderSettingsVersion.get()); + followerSettingsVersion.updateAndGet(curr -> Math.max(curr, fetchedVersion)); + threadPool.generic().execute(() -> { + handler.accept(fetchedVersion); + updates.arriveAndDeregister(); + }); + } + + @Override + protected void innerUpdateAliases(LongConsumer handler, Consumer errorHandler) { + updates.register(); + final long fetchedVersion = randomLongBetween(0L, leaderAliasesVersion.get()); + followerAliasesVersion.updateAndGet(curr -> Math.max(curr, fetchedVersion)); + threadPool.generic().execute(() -> { + handler.accept(fetchedVersion); + updates.arriveAndDeregister(); + }); + } + + @Override + protected void innerSendBulkShardOperationsRequest(String followerHistoryUUID, + List operations, + long leaderMaxSeqNoOfUpdatesOrDeletes, + Consumer handler, + Consumer errorHandler) { + + } + + @Override + protected void innerSendShardChangesRequest(long from, int maxOperationCount, + Consumer handler, + Consumer errorHandler) { + + } + + @Override + protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(LongSupplier followerGlobalCheckpoint) { + return null; + } + + @Override + synchronized void coordinateReads() { + + } + }; + int responses = between(10, 5000); + for (int i = 0; i < responses; i++) { + ShardChangesAction.Response response = new ShardChangesAction.Response( + leaderMappingVersion.addAndGet(between(0, Integer.MAX_VALUE)), + leaderSettingsVersion.addAndGet(between(0, Integer.MAX_VALUE)), + leaderAliasesVersion.addAndGet(between(0, Integer.MAX_VALUE)), + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.NO_OPS_PERFORMED, + -1, + new Translog.Operation[0], + randomLong() + ); + shardFollowNodeTask.handleReadResponse(0, -1, response); + } + try { + updates.arriveAndAwaitAdvance(); + final ShardFollowNodeTaskStatus status = shardFollowNodeTask.getStatus(); + assertThat(status.followerMappingVersion(), equalTo(followerMappingVersion.get())); + assertThat(status.followerSettingsVersion(), equalTo(followerSettingsVersion.get())); + assertThat(status.followerAliasesVersion(), equalTo(followerAliasesVersion.get())); + } finally { + terminate(threadPool); + } + } static final class ShardFollowTaskParams { private String remoteCluster = null;