diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index e624f2e6270..87614e8a333 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -20,11 +20,11 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; @@ -188,14 +188,12 @@ public class DefaultCheckpointProvider implements CheckpointProvider { if (checkpointsByIndex.containsKey(indexName)) { // we have already seen this index, just check/add shards TreeMap checkpoints = checkpointsByIndex.get(indexName); - if (checkpoints.containsKey(shard.getShardRouting().getId())) { - // there is already a checkpoint entry for this index/shard combination, check if they match - if (checkpoints.get(shard.getShardRouting().getId()) != globalCheckpoint) { - throw new CheckpointException("Global checkpoints mismatch for index [" + indexName + "] between shards of id [" - + shard.getShardRouting().getId() + "]"); - } - } else { - // 1st time we see this shard for this index, add the entry for the shard + // 1st time we see this shard for this index, add the entry for the shard + // or there is already a checkpoint entry for this index/shard combination + // but with a higher global checkpoint. This is by design(not a problem) and + // we take the higher value + if (checkpoints.containsKey(shard.getShardRouting().getId()) == false + || checkpoints.get(shard.getShardRouting().getId()) < globalCheckpoint) { checkpoints.put(shard.getShardRouting().getId(), globalCheckpoint); } } else { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java index d343dd5e006..d62a242d8fe 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java @@ -43,8 +43,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import static org.hamcrest.Matchers.containsString; - public class TransformsCheckpointServiceTests extends ESTestCase { public void testExtractIndexCheckpoints() { @@ -104,11 +102,15 @@ public class TransformsCheckpointServiceTests extends ESTestCase { ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true, false); - // fail - CheckpointException e = expectThrows(CheckpointException.class, - () -> DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices)); + Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices); - assertThat(e.getMessage(), containsString("Global checkpoints mismatch")); + assertEquals(expectedCheckpoints.size(), checkpoints.size()); + assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet()); + + // global checkpoints should be max() of all global checkpoints + for (Entry entry : expectedCheckpoints.entrySet()) { + assertArrayEquals(entry.getValue(), checkpoints.get(entry.getKey())); + } } /** @@ -176,8 +178,8 @@ public class TransformsCheckpointServiceTests extends ESTestCase { } // SeqNoStats asserts that checkpoints are logical - long localCheckpoint = randomLongBetween(0L, 100000000L); - long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(0L, 100000000L); + long localCheckpoint = randomLongBetween(100L, 100000000L); + long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(100L, 100000000L); long maxSeqNo = Math.max(localCheckpoint, globalCheckpoint); SeqNoStats validSeqNoStats = null; @@ -221,7 +223,7 @@ public class TransformsCheckpointServiceTests extends ESTestCase { if (inconsistentReplica == replica) { // overwrite SeqNoStats invalidSeqNoStats = - new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint + randomLongBetween(10L, 100L)); + new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint - randomLongBetween(10L, 100L)); shardStats.add( new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, invalidSeqNoStats, null));