forward audits to logs (#52394)

audit messages are stored in the notifications index, so audit information is lost for integration
tests. This change forwards audit messages to logs, so they can help to debug issues.

relates: #51627
This commit is contained in:
Hendrik Muhs 2020-02-18 08:44:37 +01:00
parent bdb2e72ea4
commit 2071f85e1a
2 changed files with 74 additions and 0 deletions

View File

@ -6,11 +6,14 @@
package org.elasticsearch.xpack.transform.integration;
import org.apache.logging.log4j.Level;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.AcknowledgedResponse;
@ -48,12 +51,17 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.joda.time.Instant;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -62,6 +70,7 @@ import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -73,10 +82,36 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
private Map<String, TransformConfig> transformConfigs = new HashMap<>();
protected void cleanUp() throws IOException {
logAudits();
cleanUpTransforms();
waitForPendingTasks();
}
private void logAudits() throws IOException {
RestHighLevelClient restClient = new TestRestHighLevelClient();
// using '*' to make this lenient and do not fail if the audit index does not exist
SearchRequest searchRequest = new SearchRequest(".transform-notifications-*");
searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(100).sort("timestamp", SortOrder.ASC));
restClient.indices().refresh(new RefreshRequest(searchRequest.indices()), RequestOptions.DEFAULT);
SearchResponse searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
for (SearchHit hit : searchResponse.getHits()) {
Map<String, Object> source = hit.getSourceAsMap();
String level = (String) source.getOrDefault("level", "info");
logger.log(
Level.getLevel(level.toUpperCase(Locale.ROOT)),
"Transform audit: [{}] [{}] [{}] [{}]",
Instant.ofEpochMilli((long) source.getOrDefault("timestamp", 0)),
source.getOrDefault("transform_id", "n/a"),
source.getOrDefault("message", "n/a"),
source.getOrDefault("node_name", "n/a")
);
}
}
protected void cleanUpTransforms() throws IOException {
for (TransformConfig config : transformConfigs.values()) {
stopTransform(config.getId());

View File

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.transform.integration;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.logging.log4j.Level;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
@ -24,6 +25,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -32,6 +34,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -364,6 +367,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
@After
public void waitForTransform() throws Exception {
logAudits();
if (preserveClusterUponCompletion() == false) {
ensureNoInitializingShards();
wipeTransforms();
@ -468,4 +472,39 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
protected static String getTransformEndpoint() {
return useDeprecatedEndpoints ? TransformField.REST_BASE_PATH_TRANSFORMS_DEPRECATED : TransformField.REST_BASE_PATH_TRANSFORMS;
}
@SuppressWarnings("unchecked")
private void logAudits() throws IOException {
logger.info("writing audit messages to the log");
Request searchRequest = new Request("GET", TransformInternalIndexConstants.AUDIT_INDEX + "/_search?ignore_unavailable=true");
searchRequest.setJsonEntity(
"{ \"size\": 100,"
+ " \"sort\": ["
+ " {"
+ " \"timestamp\": {"
+ " \"order\": \"asc\""
+ " }"
+ " }"
+ " ] }"
);
refreshIndex(TransformInternalIndexConstants.AUDIT_INDEX_PATTERN);
Response searchResponse = client().performRequest(searchRequest);
Map<String, Object> searchResult = entityAsMap(searchResponse);
List<Map<String, Object>> searchHits = (List<Map<String, Object>>) XContentMapValues.extractValue("hits.hits", searchResult);
for (Map<String, Object> hit : searchHits) {
Map<String, Object> source = (Map<String, Object>) XContentMapValues.extractValue("_source", hit);
String level = (String) source.getOrDefault("level", "info");
logger.log(
Level.getLevel(level.toUpperCase(Locale.ROOT)),
"Transform audit: [{}] [{}] [{}] [{}]",
Instant.ofEpochMilli((long) source.getOrDefault("timestamp", 0)),
source.getOrDefault("transform_id", "n/a"),
source.getOrDefault("message", "n/a"),
source.getOrDefault("node_name", "n/a")
);
}
}
}