Fix synchronization in ShardFollowNodeTask (#60490)

The leader mapping, settings, and aliases versions in a shard follow-task 
are updated without proper synchronization and can go backward.
This commit is contained in:
Nhat Nguyen 2020-07-31 09:48:16 -04:00
parent 2fa6448a15
commit bf7eecf1dc
2 changed files with 140 additions and 22 deletions

View File

@ -151,33 +151,31 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
// updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical // updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
updateMapping(0L, leaderMappingVersion -> { updateMapping(0L, leaderMappingVersion -> {
synchronized (ShardFollowNodeTask.this) { synchronized (ShardFollowNodeTask.this) {
currentMappingVersion = leaderMappingVersion; currentMappingVersion = Math.max(currentMappingVersion, leaderMappingVersion);
} }
updateSettings(leaderSettingsVersion -> { updateSettings(leaderSettingsVersion -> {
synchronized (ShardFollowNodeTask.this) { synchronized (ShardFollowNodeTask.this) {
currentSettingsVersion = leaderSettingsVersion; currentSettingsVersion = Math.max(currentSettingsVersion, leaderSettingsVersion);
} }
}); updateAliases(leaderAliasesVersion -> {
updateAliases(leaderAliasesVersion -> { synchronized (ShardFollowNodeTask.this) {
synchronized (ShardFollowNodeTask.this) { currentAliasesVersion = Math.max(currentAliasesVersion, leaderAliasesVersion);
currentAliasesVersion = leaderAliasesVersion; LOGGER.info(
} "{} following leader shard {}, " +
});
synchronized (ShardFollowNodeTask.this) {
LOGGER.info(
"{} following leader shard {}, " +
"follower global checkpoint=[{}], " + "follower global checkpoint=[{}], " +
"mapping version=[{}], " + "mapping version=[{}], " +
"settings version=[{}], " + "settings version=[{}], " +
"aliases version=[{}]", "aliases version=[{}]",
params.getFollowShardId(), params.getFollowShardId(),
params.getLeaderShardId(), params.getLeaderShardId(),
followerGlobalCheckpoint, followerGlobalCheckpoint,
currentMappingVersion, currentMappingVersion,
currentSettingsVersion, currentSettingsVersion,
currentAliasesVersion); currentAliasesVersion);
} }
coordinateReads(); coordinateReads();
});
});
}); });
} }
@ -446,7 +444,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]", LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]",
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion); params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
updateMapping(minimumRequiredMappingVersion, mappingVersion -> { updateMapping(minimumRequiredMappingVersion, mappingVersion -> {
currentMappingVersion = mappingVersion; synchronized (ShardFollowNodeTask.this) {
currentMappingVersion = Math.max(currentMappingVersion, mappingVersion);
}
task.run(); task.run();
}); });
} }
@ -461,7 +461,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
LOGGER.trace("{} updating settings, settings version [{}] is lower than minimum required settings version [{}]", LOGGER.trace("{} updating settings, settings version [{}] is lower than minimum required settings version [{}]",
params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion); params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion);
updateSettings(settingsVersion -> { updateSettings(settingsVersion -> {
currentSettingsVersion = settingsVersion; synchronized (ShardFollowNodeTask.this) {
currentSettingsVersion = Math.max(currentSettingsVersion, settingsVersion);
}
task.run(); task.run();
}); });
} }
@ -482,7 +484,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
currentAliasesVersion, currentAliasesVersion,
minimumRequiredAliasesVersion); minimumRequiredAliasesVersion);
updateAliases(aliasesVersion -> { updateAliases(aliasesVersion -> {
currentAliasesVersion = aliasesVersion; synchronized (ShardFollowNodeTask.this) {
currentAliasesVersion = Math.max(currentAliasesVersion, aliasesVersion);
}
task.run(); task.run();
}); });
} }

View File

@ -20,6 +20,8 @@ import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
@ -33,6 +35,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -1122,6 +1125,117 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
} }
} }
public void testUpdateMappingSettingsAndAliasesConcurrently() throws Exception {
final ShardFollowTask followTask = new ShardFollowTask(
"test",
new ShardId("leader_index", "", 0),
new ShardId("follow_index", "", 0),
Integer.MAX_VALUE,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
new ByteSizeValue(Long.MAX_VALUE),
new ByteSizeValue(Long.MAX_VALUE),
Integer.MAX_VALUE,
new ByteSizeValue(Long.MAX_VALUE),
TimeValue.ZERO,
TimeValue.ZERO,
Collections.emptyMap()
);
final ThreadPool threadPool = new TestThreadPool(getTestClass().getSimpleName());
final AtomicLong leaderMappingVersion = new AtomicLong(0L);
final AtomicLong followerMappingVersion = new AtomicLong(0L);
final AtomicLong leaderSettingsVersion = new AtomicLong(0L);
final AtomicLong followerSettingsVersion = new AtomicLong(0L);
final AtomicLong leaderAliasesVersion = new AtomicLong(0L);
final AtomicLong followerAliasesVersion = new AtomicLong(0L);
final Phaser updates = new Phaser(1);
final ShardFollowNodeTask shardFollowNodeTask = new ShardFollowNodeTask(
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), followTask, scheduler, System::nanoTime) {
@Override
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
updates.register();
final long fetchedVersion = randomLongBetween(minRequiredMappingVersion, leaderMappingVersion.get());
followerMappingVersion.updateAndGet(curr -> Math.max(curr, fetchedVersion));
threadPool.generic().execute(() -> {
handler.accept(fetchedVersion);
updates.arriveAndDeregister();
});
}
@Override
protected void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler) {
updates.register();
final long fetchedVersion = randomLongBetween(0L, leaderSettingsVersion.get());
followerSettingsVersion.updateAndGet(curr -> Math.max(curr, fetchedVersion));
threadPool.generic().execute(() -> {
handler.accept(fetchedVersion);
updates.arriveAndDeregister();
});
}
@Override
protected void innerUpdateAliases(LongConsumer handler, Consumer<Exception> errorHandler) {
updates.register();
final long fetchedVersion = randomLongBetween(0L, leaderAliasesVersion.get());
followerAliasesVersion.updateAndGet(curr -> Math.max(curr, fetchedVersion));
threadPool.generic().execute(() -> {
handler.accept(fetchedVersion);
updates.arriveAndDeregister();
});
}
@Override
protected void innerSendBulkShardOperationsRequest(String followerHistoryUUID,
List<Translog.Operation> operations,
long leaderMaxSeqNoOfUpdatesOrDeletes,
Consumer<BulkShardOperationsResponse> handler,
Consumer<Exception> errorHandler) {
}
@Override
protected void innerSendShardChangesRequest(long from, int maxOperationCount,
Consumer<ShardChangesAction.Response> handler,
Consumer<Exception> errorHandler) {
}
@Override
protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(LongSupplier followerGlobalCheckpoint) {
return null;
}
@Override
synchronized void coordinateReads() {
}
};
int responses = between(10, 5000);
for (int i = 0; i < responses; i++) {
ShardChangesAction.Response response = new ShardChangesAction.Response(
leaderMappingVersion.addAndGet(between(0, Integer.MAX_VALUE)),
leaderSettingsVersion.addAndGet(between(0, Integer.MAX_VALUE)),
leaderAliasesVersion.addAndGet(between(0, Integer.MAX_VALUE)),
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
-1,
new Translog.Operation[0],
randomLong()
);
shardFollowNodeTask.handleReadResponse(0, -1, response);
}
try {
updates.arriveAndAwaitAdvance();
final ShardFollowNodeTaskStatus status = shardFollowNodeTask.getStatus();
assertThat(status.followerMappingVersion(), equalTo(followerMappingVersion.get()));
assertThat(status.followerSettingsVersion(), equalTo(followerSettingsVersion.get()));
assertThat(status.followerAliasesVersion(), equalTo(followerAliasesVersion.get()));
} finally {
terminate(threadPool);
}
}
static final class ShardFollowTaskParams { static final class ShardFollowTaskParams {
private String remoteCluster = null; private String remoteCluster = null;