Log write failures for watcher history document. (#44129) (#44357)

The failure is correctly getting propagated, this commit adds support to
explicitly look for .watch-history failures using the same logging strategy
as triggered watch failures.
This commit is contained in:
Jake Landis 2019-07-16 08:48:09 -05:00 committed by GitHub
parent fb0461ac76
commit eb7d43f4cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 13 additions and 4 deletions

View File

@ -334,14 +334,23 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa
@Override @Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) { if (response.hasFailures()) {
Map<String, String> triggeredWatches = Arrays.stream(response.getItems()) Map<String, String> triggeredFailures = Arrays.stream(response.getItems())
.filter(BulkItemResponse::isFailed) .filter(BulkItemResponse::isFailed)
.filter(r -> r.getIndex().startsWith(TriggeredWatchStoreField.INDEX_NAME)) .filter(r -> r.getIndex().startsWith(TriggeredWatchStoreField.INDEX_NAME))
.collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage)); .collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage));
if (triggeredWatches.isEmpty() == false) { Map<String, String> historyFailures = Arrays.stream(response.getItems())
String failure = triggeredWatches.values().stream().collect(Collectors.joining(", ")); .filter(BulkItemResponse::isFailed)
.filter(r -> r.getIndex().startsWith(HistoryStoreField.INDEX_PREFIX))
.collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage));
if (triggeredFailures.isEmpty() == false) {
String failure = triggeredFailures.values().stream().collect(Collectors.joining(", "));
logger.error("triggered watches could not be deleted {}, failure [{}]", logger.error("triggered watches could not be deleted {}, failure [{}]",
triggeredWatches.keySet(), Strings.substring(failure, 0, 2000)); triggeredFailures.keySet(), Strings.substring(failure, 0, 2000));
}
if (historyFailures.isEmpty() == false) {
String failure = historyFailures.values().stream().collect(Collectors.joining(", "));
logger.error("watch history could not be written {}, failure [{}]",
historyFailures.keySet(), Strings.substring(failure, 0, 2000));
} }
Map<String, String> overwrittenIds = Arrays.stream(response.getItems()) Map<String, String> overwrittenIds = Arrays.stream(response.getItems())