diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 41e36f2a021..a4df07f5f1d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -253,7 +253,13 @@ public class Transform extends Plugin implements ActionPlugin, PersistentTaskPlu TransformConfigManager configManager = new IndexBasedTransformConfigManager(client, xContentRegistry); TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName()); - TransformCheckpointService checkpointService = new TransformCheckpointService(client, configManager, auditor); + TransformCheckpointService checkpointService = new TransformCheckpointService( + client, + settings, + clusterService, + configManager, + auditor + ); SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC()); transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index a7bda6886fd..63205fd44ca 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -16,9 +16,11 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; @@ -26,16 +28,20 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingI import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; +import org.elasticsearch.xpack.transform.checkpoint.RemoteClusterResolver.ResolvedIndices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import java.time.Instant; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.stream.Collectors; public class DefaultCheckpointProvider implements CheckpointProvider { @@ -45,17 +51,20 @@ public class DefaultCheckpointProvider implements CheckpointProvider { private static final Logger logger = LogManager.getLogger(DefaultCheckpointProvider.class); protected final Client client; + protected final RemoteClusterResolver remoteClusterResolver; protected final TransformConfigManager transformConfigManager; protected final TransformAuditor transformAuditor; protected final TransformConfig transformConfig; public DefaultCheckpointProvider( final Client client, + final RemoteClusterResolver remoteClusterResolver, final TransformConfigManager transformConfigManager, final TransformAuditor transformAuditor, final TransformConfig transformConfig ) { this.client = client; + this.remoteClusterResolver = remoteClusterResolver; this.transformConfigManager = transformConfigManager; this.transformAuditor = transformAuditor; this.transformConfig = transformConfig; @@ -84,13 +93,61 @@ public class DefaultCheckpointProvider implements CheckpointProvider { } protected void getIndexCheckpoints(ActionListener> listener) { + try { + ResolvedIndices resolvedIndexes = remoteClusterResolver.resolve(transformConfig.getSource().getIndex()); + ActionListener> groupedListener = listener; + + if (resolvedIndexes.numClusters() > 1) { + ActionListener>> mergeMapsListener = ActionListener.wrap(indexCheckpoints -> { + listener.onResponse( + indexCheckpoints.stream() + .flatMap(m -> m.entrySet().stream()) + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())) + ); + }, listener::onFailure); + + groupedListener = new GroupedActionListener<>(mergeMapsListener, resolvedIndexes.numClusters()); + } + + if (resolvedIndexes.getLocalIndices().isEmpty() == false) { + getCheckpointsFromOneCluster( + client, + transformConfig.getHeaders(), + resolvedIndexes.getLocalIndices().toArray(new String[0]), + RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, + groupedListener + ); + } + + for (Map.Entry> remoteIndex : resolvedIndexes.getRemoteIndicesPerClusterAlias().entrySet()) { + Client remoteClient = client.getRemoteClusterClient(remoteIndex.getKey()); + getCheckpointsFromOneCluster( + remoteClient, + transformConfig.getHeaders(), + remoteIndex.getValue().toArray(new String[0]), + remoteIndex.getKey() + RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR, + groupedListener + ); + } + } catch (Exception e) { + listener.onFailure(e); + } + } + + private static void getCheckpointsFromOneCluster( + Client client, + Map headers, + String[] indices, + String prefix, + ActionListener> listener + ) { // 1st get index to see the indexes the user has access to - GetIndexRequest getIndexRequest = new GetIndexRequest().indices(transformConfig.getSource().getIndex()) + GetIndexRequest getIndexRequest = new GetIndexRequest().indices(indices) .features(new GetIndexRequest.Feature[0]) .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); ClientHelper.executeWithHeadersAsync( - transformConfig.getHeaders(), + headers, ClientHelper.TRANSFORM_ORIGIN, client, GetIndexAction.INSTANCE, @@ -104,23 +161,20 @@ public class DefaultCheckpointProvider implements CheckpointProvider { client, ClientHelper.TRANSFORM_ORIGIN, IndicesStatsAction.INSTANCE, - new IndicesStatsRequest().indices(transformConfig.getSource().getIndex()) - .clear() - .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN), + new IndicesStatsRequest().indices(indices).clear().indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN), ActionListener.wrap(response -> { if (response.getFailedShards() != 0) { listener.onFailure(new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards")); return; } - - listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices)); + listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices, prefix)); }, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))) ); }, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))) ); } - static Map extractIndexCheckPoints(ShardStats[] shards, Set userIndices) { + static Map extractIndexCheckPoints(ShardStats[] shards, Set userIndices, String prefix) { Map> checkpointsByIndex = new TreeMap<>(); for (ShardStats shard : shards) { @@ -129,9 +183,10 @@ public class DefaultCheckpointProvider implements CheckpointProvider { if (userIndices.contains(indexName)) { // SeqNoStats could be `null`, assume the global checkpoint to be -1 in this case long globalCheckpoint = shard.getSeqNoStats() == null ? -1L : shard.getSeqNoStats().getGlobalCheckpoint(); - if (checkpointsByIndex.containsKey(indexName)) { + String fullIndexName = prefix + indexName; + if (checkpointsByIndex.containsKey(fullIndexName)) { // we have already seen this index, just check/add shards - TreeMap checkpoints = checkpointsByIndex.get(indexName); + TreeMap checkpoints = checkpointsByIndex.get(fullIndexName); // 1st time we see this shard for this index, add the entry for the shard // or there is already a checkpoint entry for this index/shard combination // but with a higher global checkpoint. This is by design(not a problem) and @@ -142,8 +197,8 @@ public class DefaultCheckpointProvider implements CheckpointProvider { } } else { // 1st time we see this index, create an entry for the index and add the shard checkpoint - checkpointsByIndex.put(indexName, new TreeMap<>()); - checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), globalCheckpoint); + checkpointsByIndex.put(fullIndexName, new TreeMap<>()); + checkpointsByIndex.get(fullIndexName).put(shard.getShardRouting().getId(), globalCheckpoint); } } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java new file mode 100644 index 00000000000..2b74789763e --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.checkpoint; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteConnectionStrategy; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * Maintain a list of remote clusters (aliases) and provide the ability to resolve. + */ +class RemoteClusterResolver extends RemoteClusterAware { + + private final CopyOnWriteArraySet clusters; + + class ResolvedIndices { + private final Map> remoteIndicesPerClusterAlias; + private final List localIndices; + + ResolvedIndices(Map> remoteIndicesPerClusterAlias, List localIndices) { + this.localIndices = localIndices; + this.remoteIndicesPerClusterAlias = remoteIndicesPerClusterAlias; + } + + public Map> getRemoteIndicesPerClusterAlias() { + return remoteIndicesPerClusterAlias; + } + + public List getLocalIndices() { + return localIndices; + } + + public int numClusters() { + return remoteIndicesPerClusterAlias.size() + (localIndices.isEmpty() ? 0 : 1); + } + } + + RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) { + super(settings); + clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings)); + listenForUpdates(clusterSettings); + } + + @Override + protected void updateRemoteCluster(String clusterAlias, Settings settings) { + if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings)) { + clusters.add(clusterAlias); + } else { + clusters.remove(clusterAlias); + } + } + + ResolvedIndices resolve(String... indices) { + // 7.x workaround, see gh#40419 + Map> resolvedClusterIndices = groupClusterIndices(clusters, indices, i -> false); + List localIndices = resolvedClusterIndices.getOrDefault(LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList()); + resolvedClusterIndices.remove(LOCAL_CLUSTER_GROUP_KEY); + return new ResolvedIndices(resolvedClusterIndices, localIndices); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java index 96f2ff181cf..552a1e5e5c8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java @@ -32,11 +32,12 @@ public class TimeBasedCheckpointProvider extends DefaultCheckpointProvider { TimeBasedCheckpointProvider( final Client client, + final RemoteClusterResolver remoteClusterResolver, final TransformConfigManager transformConfigManager, final TransformAuditor transformAuditor, final TransformConfig transformConfig ) { - super(client, transformConfigManager, transformAuditor, transformConfig); + super(client, remoteClusterResolver, transformConfigManager, transformAuditor, transformConfig); timeSyncConfig = (TimeSyncConfig) transformConfig.getSyncConfig(); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java index 64faf414625..492073b111c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java @@ -10,6 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -33,23 +35,33 @@ public class TransformCheckpointService { private final Client client; private final TransformConfigManager transformConfigManager; private final TransformAuditor transformAuditor; + private final RemoteClusterResolver remoteClusterResolver; public TransformCheckpointService( final Client client, + final Settings settings, + final ClusterService clusterService, final TransformConfigManager transformConfigManager, TransformAuditor transformAuditor ) { this.client = client; this.transformConfigManager = transformConfigManager; this.transformAuditor = transformAuditor; + this.remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings()); } public CheckpointProvider getCheckpointProvider(final TransformConfig transformConfig) { if (transformConfig.getSyncConfig() instanceof TimeSyncConfig) { - return new TimeBasedCheckpointProvider(client, transformConfigManager, transformAuditor, transformConfig); + return new TimeBasedCheckpointProvider( + client, + remoteClusterResolver, + transformConfigManager, + transformAuditor, + transformConfig + ); } - return new DefaultCheckpointProvider(client, transformConfigManager, transformAuditor, transformConfig); + return new DefaultCheckpointProvider(client, remoteClusterResolver, transformConfigManager, transformAuditor, transformConfig); } /** @@ -82,5 +94,4 @@ public class TransformCheckpointService { listener.onFailure(new CheckpointException("Failed to retrieve configuration", transformError)); })); } - } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java index 02fa4765ff6..b7eca071085 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java @@ -11,6 +11,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; @@ -48,6 +50,7 @@ public class DefaultCheckpointProviderTests extends ESTestCase { DefaultCheckpointProvider provider = new DefaultCheckpointProvider( client, + new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), transformConfigManager, transformAuditor, transformConfig @@ -92,6 +95,7 @@ public class DefaultCheckpointProviderTests extends ESTestCase { DefaultCheckpointProvider provider = new DefaultCheckpointProvider( client, + new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), transformConfigManager, transformAuditor, transformConfig @@ -151,6 +155,7 @@ public class DefaultCheckpointProviderTests extends ESTestCase { DefaultCheckpointProvider provider = new DefaultCheckpointProvider( client, + new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), transformConfigManager, transformAuditor, transformConfig diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java index 7af774f4a6f..1552178b7dc 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java @@ -19,7 +19,10 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.cache.query.QueryCacheStats; import org.elasticsearch.index.cache.request.RequestCacheStats; @@ -138,7 +141,13 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest // use a mock for the checkpoint service TransformAuditor mockAuditor = mock(TransformAuditor.class); - transformCheckpointService = new TransformCheckpointService(mockClientForCheckpointing, transformsConfigManager, mockAuditor); + transformCheckpointService = new TransformCheckpointService( + mockClientForCheckpointing, + Settings.EMPTY, + new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), + transformsConfigManager, + mockAuditor + ); } @AfterClass diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java index d62a242d8fe..d4b30eb8330 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java @@ -51,7 +51,7 @@ public class TransformsCheckpointServiceTests extends ESTestCase { ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, false); - Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices); + Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices, ""); assertEquals(expectedCheckpoints.size(), checkpoints.size()); assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet()); @@ -68,7 +68,7 @@ public class TransformsCheckpointServiceTests extends ESTestCase { ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, true); - Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices); + Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices, ""); assertEquals(expectedCheckpoints.size(), checkpoints.size()); assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet()); @@ -85,7 +85,7 @@ public class TransformsCheckpointServiceTests extends ESTestCase { ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, true, false, false); - Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices); + Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices, ""); assertEquals(expectedCheckpoints.size(), checkpoints.size()); assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet()); @@ -102,7 +102,7 @@ public class TransformsCheckpointServiceTests extends ESTestCase { ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true, false); - Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices); + Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices, ""); assertEquals(expectedCheckpoints.size(), checkpoints.size()); assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet()); @@ -142,8 +142,13 @@ public class TransformsCheckpointServiceTests extends ESTestCase { * @param missingSeqNoStats whether some indices miss SeqNoStats * @return array of ShardStats */ - private static ShardStats[] createRandomShardStats(Map expectedCheckpoints, Set userIndices, - boolean skipPrimaries, boolean inconsistentGlobalCheckpoints, boolean missingSeqNoStats) { + private static ShardStats[] createRandomShardStats( + Map expectedCheckpoints, + Set userIndices, + boolean skipPrimaries, + boolean inconsistentGlobalCheckpoints, + boolean missingSeqNoStats + ) { // always create the full list List indices = new ArrayList<>(); @@ -192,15 +197,17 @@ public class TransformsCheckpointServiceTests extends ESTestCase { checkpoints.add(globalCheckpoint); } - for (int replica = 0; replica < numShardCopies; replica++) { + for (int replica = 0; replica < numShardCopies; replica++) { ShardId shardId = new ShardId(index, shardIndex); boolean primary = (replica == primaryShard); Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(shardIndex)); - ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, primary, + ShardRouting shardRouting = ShardRouting.newUnassigned( + shardId, + primary, primary ? RecoverySource.EmptyStoreRecoverySource.INSTANCE : PeerRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null) - ); + ); shardRouting = shardRouting.initialize("node-0", null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); shardRouting = shardRouting.moveToStarted(); @@ -222,15 +229,18 @@ public class TransformsCheckpointServiceTests extends ESTestCase { if (inconsistentReplica == replica) { // overwrite - SeqNoStats invalidSeqNoStats = - new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint - randomLongBetween(10L, 100L)); + SeqNoStats invalidSeqNoStats = new SeqNoStats( + maxSeqNo, + localCheckpoint, + globalCheckpoint - randomLongBetween(10L, 100L) + ); shardStats.add( - new ShardStats(shardRouting, - new ShardPath(false, path, path, shardId), stats, null, invalidSeqNoStats, null)); + new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, invalidSeqNoStats, null) + ); } else { shardStats.add( - new ShardStats(shardRouting, - new ShardPath(false, path, path, shardId), stats, null, validSeqNoStats, null)); + new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, validSeqNoStats, null) + ); } } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index 09ce4e1269a..9435a3a7d4f 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -127,6 +127,8 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase { IndexBasedTransformConfigManager transformsConfigManager = new IndexBasedTransformConfigManager(client, xContentRegistry()); TransformCheckpointService transformCheckpointService = new TransformCheckpointService( client, + Settings.EMPTY, + new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), transformsConfigManager, mockAuditor ); @@ -211,6 +213,8 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase { IndexBasedTransformConfigManager transformsConfigManager = new IndexBasedTransformConfigManager(client, xContentRegistry()); TransformCheckpointService transformCheckpointService = new TransformCheckpointService( client, + Settings.EMPTY, + new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), transformsConfigManager, mockAuditor ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index 4b1c2cb2f9b..a4eff607687 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -10,6 +10,9 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.rest.RestStatus; @@ -69,8 +72,13 @@ public class TransformTaskTests extends ESTestCase { TransformConfig transformConfig = TransformConfigTests.randomDataFrameTransformConfigWithoutHeaders(); TransformAuditor auditor = new MockTransformAuditor(); TransformConfigManager transformsConfigManager = new InMemoryTransformConfigManager(); - - TransformCheckpointService transformsCheckpointService = new TransformCheckpointService(client, transformsConfigManager, auditor); + TransformCheckpointService transformsCheckpointService = new TransformCheckpointService( + client, + Settings.EMPTY, + new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), + transformsConfigManager, + auditor + ); TransformState transformState = new TransformState( TransformTaskState.FAILED,