[ML-DataFrame] make checkpointing more robust (#44344) (#44414)

make checkpointing more robust:

- do not let checkpointing fail if indexes got deleted
- treat missing seqNoStats as just created indices (checkpoint 0)
- loglevel: do not treat failed updated checks as error

fixes #43992
This commit is contained in:
Hendrik Muhs 2019-07-16 13:43:13 +02:00 committed by GitHub
parent 096c03945c
commit 6c1f740759
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 86 additions and 46 deletions

View File

@ -279,18 +279,16 @@ public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject
throw new IllegalArgumentException("old checkpoint is newer than new checkpoint");
}
// all old indices must be contained in the new ones but not vice versa
if (newCheckpoint.indicesCheckpoints.keySet().containsAll(oldCheckpoint.indicesCheckpoints.keySet()) == false) {
return -1L;
}
// get the sum of of shard checkpoints
// note: we require shard checkpoints to strictly increase and never decrease
long oldCheckPointSum = 0;
long newCheckPointSum = 0;
for (long[] v : oldCheckpoint.indicesCheckpoints.values()) {
oldCheckPointSum += Arrays.stream(v).sum();
for (Entry<String, long[]> entry : oldCheckpoint.indicesCheckpoints.entrySet()) {
// ignore entries that aren't part of newCheckpoint, e.g. deleted indices
if (newCheckpoint.indicesCheckpoints.containsKey(entry.getKey())) {
oldCheckPointSum += Arrays.stream(entry.getValue()).sum();
}
}
for (long[] v : newCheckpoint.indicesCheckpoints.values()) {

View File

@ -162,9 +162,10 @@ public class DataFrameTransformCheckpointTests extends AbstractSerializingDataFr
checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey());
assertEquals((indices - 1) * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
// remove 1st index from new, now old has 1 index more, behind can not be calculated
// remove 1st index from new, now old has 1 index more, which should be ignored
checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey());
assertEquals(-1L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
assertEquals((indices - 2) * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
}
private static Map<String, long[]> randomCheckpointsByIndex() {

View File

@ -14,9 +14,8 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
@ -94,7 +93,8 @@ public class DataFrameTransformsCheckpointService {
// 1st get index to see the indexes the user has access to
GetIndexRequest getIndexRequest = new GetIndexRequest()
.indices(transformConfig.getSource().getIndex())
.features(new GetIndexRequest.Feature[0]);
.features(new GetIndexRequest.Feature[0])
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, GetIndexAction.INSTANCE,
getIndexRequest, ActionListener.wrap(getIndexResponse -> {
@ -105,7 +105,8 @@ public class DataFrameTransformsCheckpointService {
IndicesStatsAction.INSTANCE,
new IndicesStatsRequest()
.indices(transformConfig.getSource().getIndex())
.clear(),
.clear()
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
ActionListener.wrap(
response -> {
if (response.getFailedShards() != 0) {
@ -113,21 +114,18 @@ public class DataFrameTransformsCheckpointService {
new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards"));
return;
}
try {
Map<String, long[]> checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices);
listener.onResponse(new DataFrameTransformCheckpoint(transformConfig.getId(),
timestamp,
checkpoint,
checkpointsByIndex,
timeUpperBound));
} catch (CheckpointException checkpointException) {
listener.onFailure(checkpointException);
}
Map<String, long[]> checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices);
listener.onResponse(new DataFrameTransformCheckpoint(transformConfig.getId(),
timestamp,
checkpoint,
checkpointsByIndex,
timeUpperBound));
},
listener::onFailure
e-> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))
));
},
listener::onFailure
e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))
));
}
@ -223,38 +221,44 @@ public class DataFrameTransformsCheckpointService {
for (ShardStats shard : shards) {
String indexName = shard.getShardRouting().getIndexName();
if (userIndices.contains(indexName)) {
SeqNoStats seqNoStats = shard.getSeqNoStats();
// SeqNoStats could be `null`. This indicates that an `AlreadyClosed` exception was thrown somewhere down the stack
// Indicates that the index COULD be closed, or at least that the shard is not fully recovered yet.
if (seqNoStats == null) {
logger.warn("failure gathering checkpoint information for index [{}] as seq_no_stats were null. Shard Stats [{}]",
indexName,
Strings.toString(shard));
throw new CheckpointException(
"Unable to gather checkpoint information for index [" + indexName + "]. seq_no_stats are missing.");
}
// SeqNoStats could be `null`, assume the global checkpoint to be 0 in this case
long globalCheckpoint = shard.getSeqNoStats() == null ? 0 : shard.getSeqNoStats().getGlobalCheckpoint();
if (checkpointsByIndex.containsKey(indexName)) {
// we have already seen this index, just check/add shards
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);
if (checkpoints.containsKey(shard.getShardRouting().getId())) {
// there is already a checkpoint entry for this index/shard combination, check if they match
if (checkpoints.get(shard.getShardRouting().getId()) != shard.getSeqNoStats().getGlobalCheckpoint()) {
if (checkpoints.get(shard.getShardRouting().getId()) != globalCheckpoint) {
throw new CheckpointException("Global checkpoints mismatch for index [" + indexName + "] between shards of id ["
+ shard.getShardRouting().getId() + "]");
}
} else {
// 1st time we see this shard for this index, add the entry for the shard
checkpoints.put(shard.getShardRouting().getId(), shard.getSeqNoStats().getGlobalCheckpoint());
checkpoints.put(shard.getShardRouting().getId(), globalCheckpoint);
}
} else {
// 1st time we see this index, create an entry for the index and add the shard checkpoint
checkpointsByIndex.put(indexName, new TreeMap<>());
checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), shard.getSeqNoStats().getGlobalCheckpoint());
checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), globalCheckpoint);
}
}
}
// checkpoint extraction is done in 2 steps:
// 1. GetIndexRequest to retrieve the indices the user has access to
// 2. IndicesStatsRequest to retrieve stats about indices
// between 1 and 2 indices could get deleted or created
if (logger.isDebugEnabled()) {
Set<String> userIndicesClone = new HashSet<>(userIndices);
userIndicesClone.removeAll(checkpointsByIndex.keySet());
if (userIndicesClone.isEmpty() == false) {
logger.debug("Original set of user indices contained more indexes [{}]", userIndicesClone);
}
}
// create the final structure
Map<String, long[]> checkpointsByIndexReduced = new TreeMap<>();

View File

@ -764,7 +764,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
}, e -> {
changed.set(false);
logger.error("failure in update check", e);
logger.warn(
"Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check",
e);
auditor.warning(transformId,
"Failed to detect changes for data frame transform, skipping update till next check. Exception: "
+ e.getMessage());
}), latch));
try {
@ -773,7 +779,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return changed.get();
}
} catch (InterruptedException e) {
logger.error("Failed to check for update", e);
logger.warn("Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check", e);
auditor.warning(transformId,
"Failed to detect changes for data frame transform, skipping update till next check. Exception: "
+ e.getMessage());
}
return false;

View File

@ -52,7 +52,24 @@ public class DataFrameTransformsCheckpointServiceTests extends ESTestCase {
Map<String, long[]> expectedCheckpoints = new HashMap<>();
Set<String> indices = randomUserIndices();
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false);
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, false);
Map<String, long[]> checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices);
assertEquals(expectedCheckpoints.size(), checkpoints.size());
assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet());
// low-level compare
for (Entry<String, long[]> entry : expectedCheckpoints.entrySet()) {
assertTrue(Arrays.equals(entry.getValue(), checkpoints.get(entry.getKey())));
}
}
public void testExtractIndexCheckpointsMissingSeqNoStats() {
Map<String, long[]> expectedCheckpoints = new HashMap<>();
Set<String> indices = randomUserIndices();
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, true);
Map<String, long[]> checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices);
@ -69,7 +86,7 @@ public class DataFrameTransformsCheckpointServiceTests extends ESTestCase {
Map<String, long[]> expectedCheckpoints = new HashMap<>();
Set<String> indices = randomUserIndices();
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, true, false);
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, true, false, false);
Map<String, long[]> checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices);
@ -86,7 +103,7 @@ public class DataFrameTransformsCheckpointServiceTests extends ESTestCase {
Map<String, long[]> expectedCheckpoints = new HashMap<>();
Set<String> indices = randomUserIndices();
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true);
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true, false);
// fail
CheckpointException e = expectThrows(CheckpointException.class,
@ -120,10 +137,11 @@ public class DataFrameTransformsCheckpointServiceTests extends ESTestCase {
* @param userIndices set of indices that are visible
* @param skipPrimaries whether some shards do not have a primary shard at random
* @param inconsistentGlobalCheckpoints whether to introduce inconsistent global checkpoints
* @param missingSeqNoStats whether some indices miss SeqNoStats
* @return array of ShardStats
*/
private static ShardStats[] createRandomShardStats(Map<String, long[]> expectedCheckpoints, Set<String> userIndices,
boolean skipPrimaries, boolean inconsistentGlobalCheckpoints) {
boolean skipPrimaries, boolean inconsistentGlobalCheckpoints, boolean missingSeqNoStats) {
// always create the full list
List<Index> indices = new ArrayList<>();
@ -131,6 +149,8 @@ public class DataFrameTransformsCheckpointServiceTests extends ESTestCase {
indices.add(new Index("index-2", UUIDs.randomBase64UUID(random())));
indices.add(new Index("index-3", UUIDs.randomBase64UUID(random())));
String missingSeqNoStatsIndex = randomFrom(userIndices);
List<ShardStats> shardStats = new ArrayList<>();
for (final Index index : indices) {
int numShards = randomIntBetween(1, 5);
@ -160,8 +180,15 @@ public class DataFrameTransformsCheckpointServiceTests extends ESTestCase {
long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(0L, 100000000L);
long maxSeqNo = Math.max(localCheckpoint, globalCheckpoint);
final SeqNoStats validSeqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
checkpoints.add(globalCheckpoint);
SeqNoStats validSeqNoStats = null;
// add broken seqNoStats if requested
if (missingSeqNoStats && index.getName().equals(missingSeqNoStatsIndex)) {
checkpoints.add(0L);
} else {
validSeqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
checkpoints.add(globalCheckpoint);
}
for (int replica = 0; replica < numShardCopies; replica++) {
ShardId shardId = new ShardId(index, shardIndex);