mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-24 13:55:57 +00:00
[Transform] improve TransformRestTestCase robustness (#55786)
handles/retries temporary SearchPhaseExecutionErrors fixes #54810
This commit is contained in:
parent
6f392cf5b9
commit
4b93f17b24
@ -487,7 +487,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void logAudits() throws IOException {
|
||||
private void logAudits() throws Exception {
|
||||
logger.info("writing audit messages to the log");
|
||||
Request searchRequest = new Request("GET", TransformInternalIndexConstants.AUDIT_INDEX + "/_search?ignore_unavailable=true");
|
||||
searchRequest.setJsonEntity(
|
||||
@ -501,23 +501,36 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
|
||||
+ " ] }"
|
||||
);
|
||||
|
||||
refreshIndex(TransformInternalIndexConstants.AUDIT_INDEX_PATTERN);
|
||||
assertBusy(() -> {
|
||||
try {
|
||||
refreshIndex(TransformInternalIndexConstants.AUDIT_INDEX_PATTERN);
|
||||
Response searchResponse = client().performRequest(searchRequest);
|
||||
|
||||
Response searchResponse = client().performRequest(searchRequest);
|
||||
Map<String, Object> searchResult = entityAsMap(searchResponse);
|
||||
List<Map<String, Object>> searchHits = (List<Map<String, Object>>) XContentMapValues.extractValue("hits.hits", searchResult);
|
||||
Map<String, Object> searchResult = entityAsMap(searchResponse);
|
||||
List<Map<String, Object>> searchHits = (List<Map<String, Object>>) XContentMapValues.extractValue(
|
||||
"hits.hits",
|
||||
searchResult
|
||||
);
|
||||
|
||||
for (Map<String, Object> hit : searchHits) {
|
||||
Map<String, Object> source = (Map<String, Object>) XContentMapValues.extractValue("_source", hit);
|
||||
String level = (String) source.getOrDefault("level", "info");
|
||||
logger.log(
|
||||
Level.getLevel(level.toUpperCase(Locale.ROOT)),
|
||||
"Transform audit: [{}] [{}] [{}] [{}]",
|
||||
Instant.ofEpochMilli((long) source.getOrDefault("timestamp", 0)),
|
||||
source.getOrDefault("transform_id", "n/a"),
|
||||
source.getOrDefault("message", "n/a"),
|
||||
source.getOrDefault("node_name", "n/a")
|
||||
);
|
||||
}
|
||||
for (Map<String, Object> hit : searchHits) {
|
||||
Map<String, Object> source = (Map<String, Object>) XContentMapValues.extractValue("_source", hit);
|
||||
String level = (String) source.getOrDefault("level", "info");
|
||||
logger.log(
|
||||
Level.getLevel(level.toUpperCase(Locale.ROOT)),
|
||||
"Transform audit: [{}] [{}] [{}] [{}]",
|
||||
Instant.ofEpochMilli((long) source.getOrDefault("timestamp", 0)),
|
||||
source.getOrDefault("transform_id", "n/a"),
|
||||
source.getOrDefault("message", "n/a"),
|
||||
source.getOrDefault("node_name", "n/a")
|
||||
);
|
||||
}
|
||||
} catch (ResponseException e) {
|
||||
// see gh#54810, wrap temporary 503's as assertion error for retry
|
||||
if (e.getResponse().getStatusLine().getStatusCode() != 503) {
|
||||
throw e;
|
||||
}
|
||||
throw new AssertionError("Failed to retrieve audit logs", e);
|
||||
}
|
||||
}, 5, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user