uses remote client(s) to correctly retrieve index checkpoints from remote clusters
This commit is contained in:
parent
f1c5031766
commit
0178c7c5d0
|
@ -253,7 +253,13 @@ public class Transform extends Plugin implements ActionPlugin, PersistentTaskPlu
|
|||
|
||||
TransformConfigManager configManager = new IndexBasedTransformConfigManager(client, xContentRegistry);
|
||||
TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName());
|
||||
TransformCheckpointService checkpointService = new TransformCheckpointService(client, configManager, auditor);
|
||||
TransformCheckpointService checkpointService = new TransformCheckpointService(
|
||||
client,
|
||||
settings,
|
||||
clusterService,
|
||||
configManager,
|
||||
auditor
|
||||
);
|
||||
SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC());
|
||||
|
||||
transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler));
|
||||
|
|
|
@ -16,9 +16,11 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
|||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
|
||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.support.GroupedActionListener;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
|
||||
|
@ -26,16 +28,20 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingI
|
|||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
|
||||
import org.elasticsearch.xpack.transform.checkpoint.RemoteClusterResolver.ResolvedIndices;
|
||||
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
||||
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class DefaultCheckpointProvider implements CheckpointProvider {
|
||||
|
||||
|
@ -45,17 +51,20 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
|
|||
private static final Logger logger = LogManager.getLogger(DefaultCheckpointProvider.class);
|
||||
|
||||
protected final Client client;
|
||||
protected final RemoteClusterResolver remoteClusterResolver;
|
||||
protected final TransformConfigManager transformConfigManager;
|
||||
protected final TransformAuditor transformAuditor;
|
||||
protected final TransformConfig transformConfig;
|
||||
|
||||
public DefaultCheckpointProvider(
|
||||
final Client client,
|
||||
final RemoteClusterResolver remoteClusterResolver,
|
||||
final TransformConfigManager transformConfigManager,
|
||||
final TransformAuditor transformAuditor,
|
||||
final TransformConfig transformConfig
|
||||
) {
|
||||
this.client = client;
|
||||
this.remoteClusterResolver = remoteClusterResolver;
|
||||
this.transformConfigManager = transformConfigManager;
|
||||
this.transformAuditor = transformAuditor;
|
||||
this.transformConfig = transformConfig;
|
||||
|
@ -84,13 +93,61 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
|
|||
}
|
||||
|
||||
protected void getIndexCheckpoints(ActionListener<Map<String, long[]>> listener) {
|
||||
try {
|
||||
ResolvedIndices resolvedIndexes = remoteClusterResolver.resolve(transformConfig.getSource().getIndex());
|
||||
ActionListener<Map<String, long[]>> groupedListener = listener;
|
||||
|
||||
if (resolvedIndexes.numClusters() > 1) {
|
||||
ActionListener<Collection<Map<String, long[]>>> mergeMapsListener = ActionListener.wrap(indexCheckpoints -> {
|
||||
listener.onResponse(
|
||||
indexCheckpoints.stream()
|
||||
.flatMap(m -> m.entrySet().stream())
|
||||
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()))
|
||||
);
|
||||
}, listener::onFailure);
|
||||
|
||||
groupedListener = new GroupedActionListener<>(mergeMapsListener, resolvedIndexes.numClusters());
|
||||
}
|
||||
|
||||
if (resolvedIndexes.getLocalIndices().isEmpty() == false) {
|
||||
getCheckpointsFromOneCluster(
|
||||
client,
|
||||
transformConfig.getHeaders(),
|
||||
resolvedIndexes.getLocalIndices().toArray(new String[0]),
|
||||
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY,
|
||||
groupedListener
|
||||
);
|
||||
}
|
||||
|
||||
for (Map.Entry<String, List<String>> remoteIndex : resolvedIndexes.getRemoteIndicesPerClusterAlias().entrySet()) {
|
||||
Client remoteClient = client.getRemoteClusterClient(remoteIndex.getKey());
|
||||
getCheckpointsFromOneCluster(
|
||||
remoteClient,
|
||||
transformConfig.getHeaders(),
|
||||
remoteIndex.getValue().toArray(new String[0]),
|
||||
remoteIndex.getKey() + RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR,
|
||||
groupedListener
|
||||
);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void getCheckpointsFromOneCluster(
|
||||
Client client,
|
||||
Map<String, String> headers,
|
||||
String[] indices,
|
||||
String prefix,
|
||||
ActionListener<Map<String, long[]>> listener
|
||||
) {
|
||||
// 1st get index to see the indexes the user has access to
|
||||
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(transformConfig.getSource().getIndex())
|
||||
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(indices)
|
||||
.features(new GetIndexRequest.Feature[0])
|
||||
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
||||
|
||||
ClientHelper.executeWithHeadersAsync(
|
||||
transformConfig.getHeaders(),
|
||||
headers,
|
||||
ClientHelper.TRANSFORM_ORIGIN,
|
||||
client,
|
||||
GetIndexAction.INSTANCE,
|
||||
|
@ -104,23 +161,20 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
|
|||
client,
|
||||
ClientHelper.TRANSFORM_ORIGIN,
|
||||
IndicesStatsAction.INSTANCE,
|
||||
new IndicesStatsRequest().indices(transformConfig.getSource().getIndex())
|
||||
.clear()
|
||||
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
|
||||
new IndicesStatsRequest().indices(indices).clear().indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
|
||||
ActionListener.wrap(response -> {
|
||||
if (response.getFailedShards() != 0) {
|
||||
listener.onFailure(new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards"));
|
||||
return;
|
||||
}
|
||||
|
||||
listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices));
|
||||
listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices, prefix));
|
||||
}, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e)))
|
||||
);
|
||||
}, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e)))
|
||||
);
|
||||
}
|
||||
|
||||
static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<String> userIndices) {
|
||||
static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<String> userIndices, String prefix) {
|
||||
Map<String, TreeMap<Integer, Long>> checkpointsByIndex = new TreeMap<>();
|
||||
|
||||
for (ShardStats shard : shards) {
|
||||
|
@ -129,9 +183,10 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
|
|||
if (userIndices.contains(indexName)) {
|
||||
// SeqNoStats could be `null`, assume the global checkpoint to be -1 in this case
|
||||
long globalCheckpoint = shard.getSeqNoStats() == null ? -1L : shard.getSeqNoStats().getGlobalCheckpoint();
|
||||
if (checkpointsByIndex.containsKey(indexName)) {
|
||||
String fullIndexName = prefix + indexName;
|
||||
if (checkpointsByIndex.containsKey(fullIndexName)) {
|
||||
// we have already seen this index, just check/add shards
|
||||
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);
|
||||
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(fullIndexName);
|
||||
// 1st time we see this shard for this index, add the entry for the shard
|
||||
// or there is already a checkpoint entry for this index/shard combination
|
||||
// but with a higher global checkpoint. This is by design(not a problem) and
|
||||
|
@ -142,8 +197,8 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
|
|||
}
|
||||
} else {
|
||||
// 1st time we see this index, create an entry for the index and add the shard checkpoint
|
||||
checkpointsByIndex.put(indexName, new TreeMap<>());
|
||||
checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), globalCheckpoint);
|
||||
checkpointsByIndex.put(fullIndexName, new TreeMap<>());
|
||||
checkpointsByIndex.get(fullIndexName).put(shard.getShardRouting().getId(), globalCheckpoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.transform.checkpoint;
|
||||
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.transport.RemoteClusterAware;
|
||||
import org.elasticsearch.transport.RemoteConnectionStrategy;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
||||
/**
|
||||
* Maintain a list of remote clusters (aliases) and provide the ability to resolve.
|
||||
*/
|
||||
class RemoteClusterResolver extends RemoteClusterAware {
|
||||
|
||||
private final CopyOnWriteArraySet<String> clusters;
|
||||
|
||||
class ResolvedIndices {
|
||||
private final Map<String, List<String>> remoteIndicesPerClusterAlias;
|
||||
private final List<String> localIndices;
|
||||
|
||||
ResolvedIndices(Map<String, List<String>> remoteIndicesPerClusterAlias, List<String> localIndices) {
|
||||
this.localIndices = localIndices;
|
||||
this.remoteIndicesPerClusterAlias = remoteIndicesPerClusterAlias;
|
||||
}
|
||||
|
||||
public Map<String, List<String>> getRemoteIndicesPerClusterAlias() {
|
||||
return remoteIndicesPerClusterAlias;
|
||||
}
|
||||
|
||||
public List<String> getLocalIndices() {
|
||||
return localIndices;
|
||||
}
|
||||
|
||||
public int numClusters() {
|
||||
return remoteIndicesPerClusterAlias.size() + (localIndices.isEmpty() ? 0 : 1);
|
||||
}
|
||||
}
|
||||
|
||||
RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) {
|
||||
super(settings);
|
||||
clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings));
|
||||
listenForUpdates(clusterSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
|
||||
if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings)) {
|
||||
clusters.add(clusterAlias);
|
||||
} else {
|
||||
clusters.remove(clusterAlias);
|
||||
}
|
||||
}
|
||||
|
||||
ResolvedIndices resolve(String... indices) {
|
||||
// 7.x workaround, see gh#40419
|
||||
Map<String, List<String>> resolvedClusterIndices = groupClusterIndices(clusters, indices, i -> false);
|
||||
List<String> localIndices = resolvedClusterIndices.getOrDefault(LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList());
|
||||
resolvedClusterIndices.remove(LOCAL_CLUSTER_GROUP_KEY);
|
||||
return new ResolvedIndices(resolvedClusterIndices, localIndices);
|
||||
}
|
||||
}
|
|
@ -32,11 +32,12 @@ public class TimeBasedCheckpointProvider extends DefaultCheckpointProvider {
|
|||
|
||||
TimeBasedCheckpointProvider(
|
||||
final Client client,
|
||||
final RemoteClusterResolver remoteClusterResolver,
|
||||
final TransformConfigManager transformConfigManager,
|
||||
final TransformAuditor transformAuditor,
|
||||
final TransformConfig transformConfig
|
||||
) {
|
||||
super(client, transformConfigManager, transformAuditor, transformConfig);
|
||||
super(client, remoteClusterResolver, transformConfigManager, transformAuditor, transformConfig);
|
||||
timeSyncConfig = (TimeSyncConfig) transformConfig.getSyncConfig();
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,8 @@ import org.apache.logging.log4j.LogManager;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
|
@ -33,23 +35,33 @@ public class TransformCheckpointService {
|
|||
private final Client client;
|
||||
private final TransformConfigManager transformConfigManager;
|
||||
private final TransformAuditor transformAuditor;
|
||||
private final RemoteClusterResolver remoteClusterResolver;
|
||||
|
||||
public TransformCheckpointService(
|
||||
final Client client,
|
||||
final Settings settings,
|
||||
final ClusterService clusterService,
|
||||
final TransformConfigManager transformConfigManager,
|
||||
TransformAuditor transformAuditor
|
||||
) {
|
||||
this.client = client;
|
||||
this.transformConfigManager = transformConfigManager;
|
||||
this.transformAuditor = transformAuditor;
|
||||
this.remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings());
|
||||
}
|
||||
|
||||
public CheckpointProvider getCheckpointProvider(final TransformConfig transformConfig) {
|
||||
if (transformConfig.getSyncConfig() instanceof TimeSyncConfig) {
|
||||
return new TimeBasedCheckpointProvider(client, transformConfigManager, transformAuditor, transformConfig);
|
||||
return new TimeBasedCheckpointProvider(
|
||||
client,
|
||||
remoteClusterResolver,
|
||||
transformConfigManager,
|
||||
transformAuditor,
|
||||
transformConfig
|
||||
);
|
||||
}
|
||||
|
||||
return new DefaultCheckpointProvider(client, transformConfigManager, transformAuditor, transformConfig);
|
||||
return new DefaultCheckpointProvider(client, remoteClusterResolver, transformConfigManager, transformAuditor, transformConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -82,5 +94,4 @@ public class TransformCheckpointService {
|
|||
listener.onFailure(new CheckpointException("Failed to retrieve configuration", transformError));
|
||||
}));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -11,6 +11,8 @@ import org.apache.logging.log4j.LogManager;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.MockLogAppender;
|
||||
|
@ -48,6 +50,7 @@ public class DefaultCheckpointProviderTests extends ESTestCase {
|
|||
|
||||
DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
|
||||
client,
|
||||
new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
|
||||
transformConfigManager,
|
||||
transformAuditor,
|
||||
transformConfig
|
||||
|
@ -92,6 +95,7 @@ public class DefaultCheckpointProviderTests extends ESTestCase {
|
|||
|
||||
DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
|
||||
client,
|
||||
new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
|
||||
transformConfigManager,
|
||||
transformAuditor,
|
||||
transformConfig
|
||||
|
@ -151,6 +155,7 @@ public class DefaultCheckpointProviderTests extends ESTestCase {
|
|||
|
||||
DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
|
||||
client,
|
||||
new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
|
||||
transformConfigManager,
|
||||
transformAuditor,
|
||||
transformConfig
|
||||
|
|
|
@ -19,7 +19,10 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
|||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.cache.query.QueryCacheStats;
|
||||
import org.elasticsearch.index.cache.request.RequestCacheStats;
|
||||
|
@ -138,7 +141,13 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest
|
|||
|
||||
// use a mock for the checkpoint service
|
||||
TransformAuditor mockAuditor = mock(TransformAuditor.class);
|
||||
transformCheckpointService = new TransformCheckpointService(mockClientForCheckpointing, transformsConfigManager, mockAuditor);
|
||||
transformCheckpointService = new TransformCheckpointService(
|
||||
mockClientForCheckpointing,
|
||||
Settings.EMPTY,
|
||||
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null),
|
||||
transformsConfigManager,
|
||||
mockAuditor
|
||||
);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
@ -51,7 +51,7 @@ public class TransformsCheckpointServiceTests extends ESTestCase {
|
|||
|
||||
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, false);
|
||||
|
||||
Map<String, long[]> checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices);
|
||||
Map<String, long[]> checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices, "");
|
||||
|
||||
assertEquals(expectedCheckpoints.size(), checkpoints.size());
|
||||
assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet());
|
||||
|
@ -68,7 +68,7 @@ public class TransformsCheckpointServiceTests extends ESTestCase {
|
|||
|
||||
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, true);
|
||||
|
||||
Map<String, long[]> checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices);
|
||||
Map<String, long[]> checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices, "");
|
||||
|
||||
assertEquals(expectedCheckpoints.size(), checkpoints.size());
|
||||
assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet());
|
||||
|
@ -85,7 +85,7 @@ public class TransformsCheckpointServiceTests extends ESTestCase {
|
|||
|
||||
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, true, false, false);
|
||||
|
||||
Map<String, long[]> checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices);
|
||||
Map<String, long[]> checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices, "");
|
||||
|
||||
assertEquals(expectedCheckpoints.size(), checkpoints.size());
|
||||
assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet());
|
||||
|
@ -102,7 +102,7 @@ public class TransformsCheckpointServiceTests extends ESTestCase {
|
|||
|
||||
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true, false);
|
||||
|
||||
Map<String, long[]> checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices);
|
||||
Map<String, long[]> checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices, "");
|
||||
|
||||
assertEquals(expectedCheckpoints.size(), checkpoints.size());
|
||||
assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet());
|
||||
|
@ -142,8 +142,13 @@ public class TransformsCheckpointServiceTests extends ESTestCase {
|
|||
* @param missingSeqNoStats whether some indices miss SeqNoStats
|
||||
* @return array of ShardStats
|
||||
*/
|
||||
private static ShardStats[] createRandomShardStats(Map<String, long[]> expectedCheckpoints, Set<String> userIndices,
|
||||
boolean skipPrimaries, boolean inconsistentGlobalCheckpoints, boolean missingSeqNoStats) {
|
||||
private static ShardStats[] createRandomShardStats(
|
||||
Map<String, long[]> expectedCheckpoints,
|
||||
Set<String> userIndices,
|
||||
boolean skipPrimaries,
|
||||
boolean inconsistentGlobalCheckpoints,
|
||||
boolean missingSeqNoStats
|
||||
) {
|
||||
|
||||
// always create the full list
|
||||
List<Index> indices = new ArrayList<>();
|
||||
|
@ -192,15 +197,17 @@ public class TransformsCheckpointServiceTests extends ESTestCase {
|
|||
checkpoints.add(globalCheckpoint);
|
||||
}
|
||||
|
||||
for (int replica = 0; replica < numShardCopies; replica++) {
|
||||
for (int replica = 0; replica < numShardCopies; replica++) {
|
||||
ShardId shardId = new ShardId(index, shardIndex);
|
||||
boolean primary = (replica == primaryShard);
|
||||
|
||||
Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(shardIndex));
|
||||
ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, primary,
|
||||
ShardRouting shardRouting = ShardRouting.newUnassigned(
|
||||
shardId,
|
||||
primary,
|
||||
primary ? RecoverySource.EmptyStoreRecoverySource.INSTANCE : PeerRecoverySource.INSTANCE,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)
|
||||
);
|
||||
);
|
||||
shardRouting = shardRouting.initialize("node-0", null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
|
||||
shardRouting = shardRouting.moveToStarted();
|
||||
|
||||
|
@ -222,15 +229,18 @@ public class TransformsCheckpointServiceTests extends ESTestCase {
|
|||
|
||||
if (inconsistentReplica == replica) {
|
||||
// overwrite
|
||||
SeqNoStats invalidSeqNoStats =
|
||||
new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint - randomLongBetween(10L, 100L));
|
||||
SeqNoStats invalidSeqNoStats = new SeqNoStats(
|
||||
maxSeqNo,
|
||||
localCheckpoint,
|
||||
globalCheckpoint - randomLongBetween(10L, 100L)
|
||||
);
|
||||
shardStats.add(
|
||||
new ShardStats(shardRouting,
|
||||
new ShardPath(false, path, path, shardId), stats, null, invalidSeqNoStats, null));
|
||||
new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, invalidSeqNoStats, null)
|
||||
);
|
||||
} else {
|
||||
shardStats.add(
|
||||
new ShardStats(shardRouting,
|
||||
new ShardPath(false, path, path, shardId), stats, null, validSeqNoStats, null));
|
||||
new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, validSeqNoStats, null)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -127,6 +127,8 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
|
|||
IndexBasedTransformConfigManager transformsConfigManager = new IndexBasedTransformConfigManager(client, xContentRegistry());
|
||||
TransformCheckpointService transformCheckpointService = new TransformCheckpointService(
|
||||
client,
|
||||
Settings.EMPTY,
|
||||
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null),
|
||||
transformsConfigManager,
|
||||
mockAuditor
|
||||
);
|
||||
|
@ -211,6 +213,8 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
|
|||
IndexBasedTransformConfigManager transformsConfigManager = new IndexBasedTransformConfigManager(client, xContentRegistry());
|
||||
TransformCheckpointService transformCheckpointService = new TransformCheckpointService(
|
||||
client,
|
||||
Settings.EMPTY,
|
||||
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null),
|
||||
transformsConfigManager,
|
||||
mockAuditor
|
||||
);
|
||||
|
|
|
@ -10,6 +10,9 @@ import org.elasticsearch.ElasticsearchStatusException;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -69,8 +72,13 @@ public class TransformTaskTests extends ESTestCase {
|
|||
TransformConfig transformConfig = TransformConfigTests.randomDataFrameTransformConfigWithoutHeaders();
|
||||
TransformAuditor auditor = new MockTransformAuditor();
|
||||
TransformConfigManager transformsConfigManager = new InMemoryTransformConfigManager();
|
||||
|
||||
TransformCheckpointService transformsCheckpointService = new TransformCheckpointService(client, transformsConfigManager, auditor);
|
||||
TransformCheckpointService transformsCheckpointService = new TransformCheckpointService(
|
||||
client,
|
||||
Settings.EMPTY,
|
||||
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null),
|
||||
transformsConfigManager,
|
||||
auditor
|
||||
);
|
||||
|
||||
TransformState transformState = new TransformState(
|
||||
TransformTaskState.FAILED,
|
||||
|
|
Loading…
Reference in New Issue