mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Write deprecation logs to a data stream (#61966)
Backport of #58924. Closes #46106. Introduce a mechanism for writing deprecation logs to a data stream as well as to disk.
This commit is contained in:
parent
a4fb501a33
commit
b7fd7cf154
@ -19,11 +19,11 @@
|
||||
|
||||
package org.elasticsearch.common.logging;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A logger message used by {@link DeprecationLogger}.
|
||||
* Carries x-opaque-id field if provided in the headers. Will populate the x-opaque-id field in JSON logs.
|
||||
|
@ -1,5 +1,4 @@
|
||||
apply plugin: 'elasticsearch.esplugin'
|
||||
apply plugin: 'elasticsearch.internal-cluster-test'
|
||||
|
||||
esplugin {
|
||||
name 'x-pack-deprecation'
|
||||
@ -9,6 +8,15 @@ esplugin {
|
||||
}
|
||||
archivesBaseName = 'x-pack-deprecation'
|
||||
|
||||
// add all sub-projects of the qa sub-project
|
||||
gradle.projectsEvaluated {
|
||||
project.subprojects
|
||||
.find { it.path == project.path + ":qa" }
|
||||
.subprojects
|
||||
.findAll { it.path.startsWith(project.path + ":qa") }
|
||||
.each { check.dependsOn it.check }
|
||||
}
|
||||
|
||||
dependencies {
|
||||
compileOnly project(":x-pack:plugin:core")
|
||||
}
|
||||
|
0
x-pack/plugin/deprecation/qa/build.gradle
Normal file
0
x-pack/plugin/deprecation/qa/build.gradle
Normal file
27
x-pack/plugin/deprecation/qa/rest/build.gradle
Normal file
27
x-pack/plugin/deprecation/qa/rest/build.gradle
Normal file
@ -0,0 +1,27 @@
|
||||
apply plugin: 'elasticsearch.esplugin'
|
||||
apply plugin: 'elasticsearch.java-rest-test'
|
||||
|
||||
esplugin {
|
||||
description 'Deprecated query plugin'
|
||||
classname 'org.elasticsearch.xpack.deprecation.TestDeprecationPlugin'
|
||||
}
|
||||
|
||||
dependencies {
|
||||
javaRestTestImplementation("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}")
|
||||
javaRestTestImplementation("com.fasterxml.jackson.core:jackson-databind:${versions.jackson}")
|
||||
// let the javaRestTest see the classpath of main
|
||||
javaRestTestImplementation project.sourceSets.main.runtimeClasspath
|
||||
}
|
||||
|
||||
restResources {
|
||||
restApi {
|
||||
includeCore '_common', 'indices', 'index'
|
||||
}
|
||||
}
|
||||
|
||||
testClusters.all {
|
||||
testDistribution = 'DEFAULT'
|
||||
setting 'xpack.security.enabled', 'false'
|
||||
}
|
||||
|
||||
test.enabled = false
|
@ -0,0 +1,360 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.deprecation;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.client.RestClientBuilder;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.logging.HeaderWarning;
|
||||
import org.elasticsearch.common.logging.LoggerMessageFormat;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasEntry;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.hasItems;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
/**
|
||||
* Tests {@code DeprecationLogger} uses the {@code ThreadContext} to add response headers.
|
||||
*/
|
||||
public class DeprecationHttpIT extends ESRestTestCase {
|
||||
|
||||
/**
|
||||
* Check that configuring deprecation settings causes a warning to be added to the
|
||||
* response headers.
|
||||
*/
|
||||
public void testDeprecatedSettingsReturnWarnings() throws IOException {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder()
|
||||
.startObject()
|
||||
.startObject("transient")
|
||||
.field(
|
||||
TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1.getKey(),
|
||||
!TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1.getDefault(Settings.EMPTY)
|
||||
)
|
||||
.field(
|
||||
TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2.getKey(),
|
||||
!TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2.getDefault(Settings.EMPTY)
|
||||
)
|
||||
// There should be no warning for this field
|
||||
.field(
|
||||
TestDeprecationHeaderRestAction.TEST_NOT_DEPRECATED_SETTING.getKey(),
|
||||
!TestDeprecationHeaderRestAction.TEST_NOT_DEPRECATED_SETTING.getDefault(Settings.EMPTY)
|
||||
)
|
||||
.endObject()
|
||||
.endObject();
|
||||
|
||||
final Request request = new Request("PUT", "_cluster/settings");
|
||||
request.setJsonEntity(Strings.toString(builder));
|
||||
final Response response = client().performRequest(request);
|
||||
|
||||
final List<String> deprecatedWarnings = getWarningHeaders(response.getHeaders());
|
||||
final List<Matcher<String>> headerMatchers = new ArrayList<>(2);
|
||||
|
||||
for (Setting<Boolean> setting : Arrays.asList(
|
||||
TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1,
|
||||
TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2
|
||||
)) {
|
||||
headerMatchers.add(
|
||||
equalTo(
|
||||
"["
|
||||
+ setting.getKey()
|
||||
+ "] setting was deprecated in Elasticsearch and will be removed in a "
|
||||
+ "future release! "
|
||||
+ "See the breaking changes documentation for the next major version."
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
assertThat(deprecatedWarnings, hasSize(headerMatchers.size()));
|
||||
for (final String deprecatedWarning : deprecatedWarnings) {
|
||||
assertThat(
|
||||
"Header does not conform to expected pattern",
|
||||
deprecatedWarning,
|
||||
matches(HeaderWarning.WARNING_HEADER_PATTERN.pattern())
|
||||
);
|
||||
}
|
||||
|
||||
final List<String> actualWarningValues = deprecatedWarnings.stream()
|
||||
.map(s -> HeaderWarning.extractWarningValueFromWarningHeader(s, true))
|
||||
.collect(Collectors.toList());
|
||||
for (Matcher<String> headerMatcher : headerMatchers) {
|
||||
assertThat(actualWarningValues, hasItem(headerMatcher));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to do a scatter/gather request that expects unique responses per sub-request.
|
||||
*/
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/19222")
|
||||
public void testUniqueDeprecationResponsesMergedTogether() throws IOException {
|
||||
final String[] indices = new String[randomIntBetween(2, 5)];
|
||||
|
||||
// add at least one document for each index
|
||||
for (int i = 0; i < indices.length; ++i) {
|
||||
indices[i] = "test" + i;
|
||||
|
||||
// create indices with a single shard to reduce noise; the query only deprecates uniquely by index anyway
|
||||
createIndex(indices[i], Settings.builder().put("number_of_shards", 1).build());
|
||||
|
||||
int randomDocCount = randomIntBetween(1, 2);
|
||||
|
||||
for (int j = 0; j < randomDocCount; j++) {
|
||||
final Request request = new Request("PUT", indices[i] + "/" + j);
|
||||
request.setJsonEntity("{ \"field\": " + j + " }");
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
}
|
||||
|
||||
final String commaSeparatedIndices = String.join(",", indices);
|
||||
|
||||
client().performRequest(new Request("POST", commaSeparatedIndices + "/_refresh"));
|
||||
|
||||
// trigger all index deprecations
|
||||
Request request = new Request("GET", "/" + commaSeparatedIndices + "/_search");
|
||||
request.setJsonEntity("{ \"query\": { \"bool\": { \"filter\": [ { \"deprecated\": {} } ] } } }");
|
||||
Response response = client().performRequest(request);
|
||||
assertOK(response);
|
||||
|
||||
final List<String> deprecatedWarnings = getWarningHeaders(response.getHeaders());
|
||||
final List<Matcher<String>> headerMatchers = new ArrayList<>();
|
||||
|
||||
for (String index : indices) {
|
||||
headerMatchers.add(containsString(LoggerMessageFormat.format("[{}] index", (Object) index)));
|
||||
}
|
||||
|
||||
assertThat(deprecatedWarnings, hasSize(headerMatchers.size()));
|
||||
for (Matcher<String> headerMatcher : headerMatchers) {
|
||||
assertThat(deprecatedWarnings, hasItem(headerMatcher));
|
||||
}
|
||||
}
|
||||
|
||||
public void testDeprecationWarningsAppearInHeaders() throws Exception {
|
||||
doTestDeprecationWarningsAppearInHeaders();
|
||||
}
|
||||
|
||||
public void testDeprecationHeadersDoNotGetStuck() throws Exception {
|
||||
doTestDeprecationWarningsAppearInHeaders();
|
||||
doTestDeprecationWarningsAppearInHeaders();
|
||||
if (rarely()) {
|
||||
doTestDeprecationWarningsAppearInHeaders();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a request that receives a predictably randomized number of deprecation warnings.
|
||||
* <p>
|
||||
* Re-running this back-to-back helps to ensure that warnings are not being maintained across requests.
|
||||
*/
|
||||
private void doTestDeprecationWarningsAppearInHeaders() throws IOException {
|
||||
final boolean useDeprecatedField = randomBoolean();
|
||||
final boolean useNonDeprecatedSetting = randomBoolean();
|
||||
|
||||
// deprecated settings should also trigger a deprecation warning
|
||||
final List<Setting<Boolean>> settings = new ArrayList<>(3);
|
||||
settings.add(TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1);
|
||||
|
||||
if (randomBoolean()) {
|
||||
settings.add(TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2);
|
||||
}
|
||||
|
||||
if (useNonDeprecatedSetting) {
|
||||
settings.add(TestDeprecationHeaderRestAction.TEST_NOT_DEPRECATED_SETTING);
|
||||
}
|
||||
|
||||
Collections.shuffle(settings, random());
|
||||
|
||||
// trigger all deprecations
|
||||
Request request = new Request("GET", "/_test_cluster/deprecated_settings");
|
||||
request.setEntity(buildSettingsRequest(settings, useDeprecatedField));
|
||||
Response response = client().performRequest(request);
|
||||
assertOK(response);
|
||||
|
||||
final List<String> deprecatedWarnings = getWarningHeaders(response.getHeaders());
|
||||
final List<Matcher<String>> headerMatchers = new ArrayList<>(4);
|
||||
|
||||
headerMatchers.add(equalTo(TestDeprecationHeaderRestAction.DEPRECATED_ENDPOINT));
|
||||
if (useDeprecatedField) {
|
||||
headerMatchers.add(equalTo(TestDeprecationHeaderRestAction.DEPRECATED_USAGE));
|
||||
}
|
||||
|
||||
assertThat(deprecatedWarnings, hasSize(headerMatchers.size()));
|
||||
for (final String deprecatedWarning : deprecatedWarnings) {
|
||||
assertThat(deprecatedWarning, matches(HeaderWarning.WARNING_HEADER_PATTERN.pattern()));
|
||||
}
|
||||
final List<String> actualWarningValues = deprecatedWarnings.stream()
|
||||
.map(s -> HeaderWarning.extractWarningValueFromWarningHeader(s, true))
|
||||
.collect(Collectors.toList());
|
||||
for (Matcher<String> headerMatcher : headerMatchers) {
|
||||
assertThat(actualWarningValues, hasItem(headerMatcher));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that deprecation messages can be recorded to an index
|
||||
*/
|
||||
public void testDeprecationMessagesCanBeIndexed() throws Exception {
|
||||
try {
|
||||
configureWriteDeprecationLogsToIndex(true);
|
||||
|
||||
final Request request = new Request("GET", "/_test_cluster/deprecated_settings");
|
||||
final RequestOptions options = request.getOptions().toBuilder().addHeader("X-Opaque-Id", "some xid").build();
|
||||
request.setOptions(options);
|
||||
request.setEntity(
|
||||
buildSettingsRequest(Collections.singletonList(TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1), true)
|
||||
);
|
||||
assertOK(client().performRequest(request));
|
||||
|
||||
assertBusy(() -> {
|
||||
Response response;
|
||||
try {
|
||||
response = client().performRequest(new Request("GET", "logs-deprecation-elasticsearch/_search"));
|
||||
} catch (Exception e) {
|
||||
// It can take a moment for the index to be created. If it doesn't exist then the client
|
||||
// throws an exception. Translate it into an assertion error so that assertBusy() will
|
||||
// continue trying.
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
assertOK(response);
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
final JsonNode jsonNode = mapper.readTree(response.getEntity().getContent());
|
||||
|
||||
final int hits = jsonNode.at("/hits/total/value").intValue();
|
||||
assertThat(hits, greaterThan(0));
|
||||
|
||||
List<Map<String, Object>> documents = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < hits; i++) {
|
||||
final JsonNode hit = jsonNode.at("/hits/hits/" + i + "/_source");
|
||||
|
||||
final Map<String, Object> document = new HashMap<>();
|
||||
hit.fields().forEachRemaining(entry -> document.put(entry.getKey(), entry.getValue().textValue()));
|
||||
|
||||
documents.add(document);
|
||||
}
|
||||
|
||||
logger.warn(documents);
|
||||
assertThat(documents, hasSize(2));
|
||||
|
||||
assertThat(
|
||||
documents,
|
||||
hasItems(
|
||||
allOf(
|
||||
hasKey("@timestamp"),
|
||||
hasKey("cluster.name"),
|
||||
hasKey("cluster.uuid"),
|
||||
hasKey("component"),
|
||||
hasEntry("data_stream.datatype", "deprecation"),
|
||||
hasEntry("data_stream.namespace", "elasticsearch"),
|
||||
hasEntry("data_stream.type", "logs"),
|
||||
hasEntry("ecs.version", "1.6"),
|
||||
hasEntry("key", "deprecated_settings"),
|
||||
hasEntry("level", "DEPRECATION"),
|
||||
hasEntry("message", "[deprecated_settings] usage is deprecated. use [settings] instead"),
|
||||
hasKey("node.id"),
|
||||
hasKey("node.name"),
|
||||
hasEntry("x-opaque-id", "some xid")
|
||||
),
|
||||
allOf(
|
||||
hasKey("@timestamp"),
|
||||
hasKey("cluster.name"),
|
||||
hasKey("cluster.uuid"),
|
||||
hasKey("component"),
|
||||
hasEntry("data_stream.datatype", "deprecation"),
|
||||
hasEntry("data_stream.namespace", "elasticsearch"),
|
||||
hasEntry("data_stream.type", "logs"),
|
||||
hasEntry("ecs.version", "1.6"),
|
||||
hasEntry("key", "deprecated_route"),
|
||||
hasEntry("level", "DEPRECATION"),
|
||||
hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests"),
|
||||
hasKey("node.id"),
|
||||
hasKey("node.name"),
|
||||
hasEntry("x-opaque-id", "some xid")
|
||||
)
|
||||
)
|
||||
);
|
||||
});
|
||||
} finally {
|
||||
configureWriteDeprecationLogsToIndex(null);
|
||||
client().performRequest(new Request("DELETE", "_data_stream/logs-deprecation-elasticsearch"));
|
||||
}
|
||||
}
|
||||
|
||||
private void configureWriteDeprecationLogsToIndex(Boolean value) throws IOException {
|
||||
final Request request = new Request("PUT", "_cluster/settings");
|
||||
request.setJsonEntity("{ \"transient\": { \"cluster.deprecation_indexing.enabled\": " + value + " } }");
|
||||
final Response response = client().performRequest(request);
|
||||
assertOK(response);
|
||||
}
|
||||
|
||||
private List<String> getWarningHeaders(Header[] headers) {
|
||||
List<String> warnings = new ArrayList<>();
|
||||
|
||||
for (Header header : headers) {
|
||||
if (header.getName().equals("Warning")) {
|
||||
warnings.add(header.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
return warnings;
|
||||
}
|
||||
|
||||
private HttpEntity buildSettingsRequest(List<Setting<Boolean>> settings, boolean useDeprecatedField) throws IOException {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
|
||||
builder.startObject().startArray(useDeprecatedField ? "deprecated_settings" : "settings");
|
||||
|
||||
for (Setting<Boolean> setting : settings) {
|
||||
builder.value(setting.getKey());
|
||||
}
|
||||
|
||||
builder.endArray().endObject();
|
||||
|
||||
return new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a REST client that will tolerate warnings in the response headers. The default
|
||||
* is to throw an exception.
|
||||
*/
|
||||
@Override
|
||||
protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException {
|
||||
RestClientBuilder builder = RestClient.builder(hosts);
|
||||
configureClient(builder, settings);
|
||||
builder.setStrictDeprecationMode(false);
|
||||
return builder.build();
|
||||
}
|
||||
}
|
@ -37,15 +37,26 @@ public class TestDeprecationHeaderRestAction extends BaseRestHandler {
|
||||
private static final Logger logger = LogManager.getLogger(TestDeprecationHeaderRestAction.class);
|
||||
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(logger.getName());
|
||||
|
||||
public static final Setting<Boolean> TEST_DEPRECATED_SETTING_TRUE1 =
|
||||
Setting.boolSetting("test.setting.deprecated.true1", true,
|
||||
Setting.Property.NodeScope, Setting.Property.Deprecated, Setting.Property.Dynamic);
|
||||
public static final Setting<Boolean> TEST_DEPRECATED_SETTING_TRUE2 =
|
||||
Setting.boolSetting("test.setting.deprecated.true2", true,
|
||||
Setting.Property.NodeScope, Setting.Property.Deprecated, Setting.Property.Dynamic);
|
||||
public static final Setting<Boolean> TEST_NOT_DEPRECATED_SETTING =
|
||||
Setting.boolSetting("test.setting.not_deprecated", false,
|
||||
Setting.Property.NodeScope, Setting.Property.Dynamic);
|
||||
public static final Setting<Boolean> TEST_DEPRECATED_SETTING_TRUE1 = Setting.boolSetting(
|
||||
"test.setting.deprecated.true1",
|
||||
true,
|
||||
Setting.Property.NodeScope,
|
||||
Setting.Property.Deprecated,
|
||||
Setting.Property.Dynamic
|
||||
);
|
||||
public static final Setting<Boolean> TEST_DEPRECATED_SETTING_TRUE2 = Setting.boolSetting(
|
||||
"test.setting.deprecated.true2",
|
||||
true,
|
||||
Setting.Property.NodeScope,
|
||||
Setting.Property.Deprecated,
|
||||
Setting.Property.Dynamic
|
||||
);
|
||||
public static final Setting<Boolean> TEST_NOT_DEPRECATED_SETTING = Setting.boolSetting(
|
||||
"test.setting.not_deprecated",
|
||||
false,
|
||||
Setting.Property.NodeScope,
|
||||
Setting.Property.Dynamic
|
||||
);
|
||||
|
||||
private static final Map<String, Setting<?>> SETTINGS_MAP;
|
||||
|
||||
@ -70,8 +81,7 @@ public class TestDeprecationHeaderRestAction extends BaseRestHandler {
|
||||
|
||||
@Override
|
||||
public List<DeprecatedRoute> deprecatedRoutes() {
|
||||
return singletonList(
|
||||
new DeprecatedRoute(GET, "/_test_cluster/deprecated_settings", DEPRECATED_ENDPOINT));
|
||||
return singletonList(new DeprecatedRoute(GET, "/_test_cluster/deprecated_settings", DEPRECATED_ENDPOINT));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -95,18 +105,25 @@ public class TestDeprecationHeaderRestAction extends BaseRestHandler {
|
||||
if (source.containsKey("deprecated_settings")) {
|
||||
deprecationLogger.deprecate("deprecated_settings", DEPRECATED_USAGE);
|
||||
|
||||
settings = (List<String>)source.get("deprecated_settings");
|
||||
settings = (List<String>) source.get("deprecated_settings");
|
||||
} else {
|
||||
settings = (List<String>)source.get("settings");
|
||||
settings = (List<String>) source.get("settings");
|
||||
}
|
||||
}
|
||||
|
||||
// Pull out the settings values here in order to guarantee that any deprecation messages are triggered
|
||||
// in the same thread context.
|
||||
final Map<String, Object> settingsMap = new HashMap<>();
|
||||
for (String setting : settings) {
|
||||
settingsMap.put(setting, SETTINGS_MAP.get(setting).get(this.settings));
|
||||
}
|
||||
|
||||
return channel -> {
|
||||
final XContentBuilder builder = channel.newBuilder();
|
||||
|
||||
builder.startObject().startArray("settings");
|
||||
for (String setting : settings) {
|
||||
builder.startObject().field(setting, SETTINGS_MAP.get(setting).get(this.settings)).endObject();
|
||||
for (Map.Entry<String, Object> entry : settingsMap.entrySet()) {
|
||||
builder.startObject().field(entry.getKey(), entry.getValue()).endObject();
|
||||
}
|
||||
builder.endArray().endObject();
|
||||
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
|
@ -31,9 +31,15 @@ import static java.util.Collections.singletonList;
|
||||
public class TestDeprecationPlugin extends Plugin implements ActionPlugin, SearchPlugin {
|
||||
|
||||
@Override
|
||||
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
|
||||
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
Supplier<DiscoveryNodes> nodesInCluster) {
|
||||
public List<RestHandler> getRestHandlers(
|
||||
Settings settings,
|
||||
RestController restController,
|
||||
ClusterSettings clusterSettings,
|
||||
IndexScopedSettings indexScopedSettings,
|
||||
SettingsFilter settingsFilter,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
Supplier<DiscoveryNodes> nodesInCluster
|
||||
) {
|
||||
return Collections.singletonList(new TestDeprecationHeaderRestAction(settings));
|
||||
}
|
||||
|
||||
@ -42,13 +48,15 @@ public class TestDeprecationPlugin extends Plugin implements ActionPlugin, Searc
|
||||
return Arrays.asList(
|
||||
TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1,
|
||||
TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2,
|
||||
TestDeprecationHeaderRestAction.TEST_NOT_DEPRECATED_SETTING);
|
||||
TestDeprecationHeaderRestAction.TEST_NOT_DEPRECATED_SETTING
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<QuerySpec<?>> getQueries() {
|
||||
return singletonList(new QuerySpec<>(TestDeprecatedQueryBuilder.NAME, TestDeprecatedQueryBuilder::new,
|
||||
TestDeprecatedQueryBuilder::fromXContent));
|
||||
return singletonList(
|
||||
new QuerySpec<>(TestDeprecatedQueryBuilder.NAME, TestDeprecatedQueryBuilder::new, TestDeprecatedQueryBuilder::fromXContent)
|
||||
);
|
||||
}
|
||||
|
||||
}
|
@ -1,270 +0,0 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.deprecation;
|
||||
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.client.RestClientBuilder;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.logging.HeaderWarning;
|
||||
import org.elasticsearch.common.logging.LoggerMessageFormat;
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.http.HttpInfo;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.transport.Netty4Plugin;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.common.logging.HeaderWarning.WARNING_HEADER_PATTERN;
|
||||
import static org.elasticsearch.rest.RestStatus.OK;
|
||||
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
|
||||
import static org.elasticsearch.xpack.deprecation.TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1;
|
||||
import static org.elasticsearch.xpack.deprecation.TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE2;
|
||||
import static org.elasticsearch.xpack.deprecation.TestDeprecationHeaderRestAction.TEST_NOT_DEPRECATED_SETTING;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
/**
|
||||
* Tests {@code DeprecationLogger} uses the {@code ThreadContext} to add response headers.
|
||||
*/
|
||||
public class DeprecationHttpIT extends ESSingleNodeTestCase {
|
||||
|
||||
private static RestClient restClient;
|
||||
|
||||
@Override
|
||||
protected boolean addMockHttpTransport() {
|
||||
return false; // enable http
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
return Arrays.asList(Netty4Plugin.class, XPackPlugin.class, Deprecation.class, TestDeprecationPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings() {
|
||||
return Settings.builder()
|
||||
// change values of deprecated settings so that accessing them is logged
|
||||
.put(TEST_DEPRECATED_SETTING_TRUE1.getKey(), ! TEST_DEPRECATED_SETTING_TRUE1.getDefault(Settings.EMPTY))
|
||||
.put(TEST_DEPRECATED_SETTING_TRUE2.getKey(), ! TEST_DEPRECATED_SETTING_TRUE2.getDefault(Settings.EMPTY))
|
||||
// non-deprecated setting to ensure not everything is logged
|
||||
.put(TEST_NOT_DEPRECATED_SETTING.getKey(), ! TEST_NOT_DEPRECATED_SETTING.getDefault(Settings.EMPTY))
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to do a scatter/gather request that expects unique responses per sub-request.
|
||||
*/
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/19222")
|
||||
public void testUniqueDeprecationResponsesMergedTogether() throws IOException {
|
||||
final String[] indices = new String[randomIntBetween(2, 5)];
|
||||
|
||||
// add at least one document for each index
|
||||
for (int i = 0; i < indices.length; ++i) {
|
||||
indices[i] = "test" + i;
|
||||
|
||||
// create indices with a single shard to reduce noise; the query only deprecates uniquely by index anyway
|
||||
assertTrue(
|
||||
client().admin()
|
||||
.indices()
|
||||
.prepareCreate(indices[i])
|
||||
.setSettings(Settings.builder().put("number_of_shards", 1))
|
||||
.get()
|
||||
.isAcknowledged()
|
||||
);
|
||||
|
||||
int randomDocCount = randomIntBetween(1, 2);
|
||||
|
||||
for (int j = 0; j < randomDocCount; ++j) {
|
||||
client().prepareIndex(indices[i], "type")
|
||||
.setId(Integer.toString(j))
|
||||
.setSource("{\"field\":" + j + "}", XContentType.JSON)
|
||||
.execute()
|
||||
.actionGet();
|
||||
}
|
||||
}
|
||||
|
||||
client().admin().indices().refresh(new RefreshRequest(indices));
|
||||
|
||||
final String commaSeparatedIndices = String.join(",", indices);
|
||||
|
||||
// trigger all index deprecations
|
||||
Request request = new Request("GET", "/" + commaSeparatedIndices + "/_search");
|
||||
request.setJsonEntity("{\"query\":{\"bool\":{\"filter\":[{\"" + TestDeprecatedQueryBuilder.NAME + "\":{}}]}}}");
|
||||
Response response = getRestClient().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
||||
|
||||
final List<String> deprecatedWarnings = getWarningHeaders(response.getHeaders());
|
||||
final List<Matcher<String>> headerMatchers = new ArrayList<>(indices.length);
|
||||
|
||||
for (String index : indices) {
|
||||
headerMatchers.add(containsString(LoggerMessageFormat.format("[{}] index", (Object)index)));
|
||||
}
|
||||
|
||||
assertThat(deprecatedWarnings, hasSize(headerMatchers.size()));
|
||||
for (Matcher<String> headerMatcher : headerMatchers) {
|
||||
assertThat(deprecatedWarnings, hasItem(headerMatcher));
|
||||
}
|
||||
}
|
||||
|
||||
public void testDeprecationWarningsAppearInHeaders() throws Exception {
|
||||
doTestDeprecationWarningsAppearInHeaders();
|
||||
}
|
||||
|
||||
public void testDeprecationHeadersDoNotGetStuck() throws Exception {
|
||||
doTestDeprecationWarningsAppearInHeaders();
|
||||
doTestDeprecationWarningsAppearInHeaders();
|
||||
if (rarely()) {
|
||||
doTestDeprecationWarningsAppearInHeaders();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a request that receives a predictably randomized number of deprecation warnings.
|
||||
* <p>
|
||||
* Re-running this back-to-back helps to ensure that warnings are not being maintained across requests.
|
||||
*/
|
||||
private void doTestDeprecationWarningsAppearInHeaders() throws IOException {
|
||||
final boolean useDeprecatedField = randomBoolean();
|
||||
final boolean useNonDeprecatedSetting = randomBoolean();
|
||||
|
||||
// deprecated settings should also trigger a deprecation warning
|
||||
final List<Setting<Boolean>> settings = new ArrayList<>(3);
|
||||
settings.add(TEST_DEPRECATED_SETTING_TRUE1);
|
||||
|
||||
if (randomBoolean()) {
|
||||
settings.add(TEST_DEPRECATED_SETTING_TRUE2);
|
||||
}
|
||||
|
||||
if (useNonDeprecatedSetting) {
|
||||
settings.add(TEST_NOT_DEPRECATED_SETTING);
|
||||
}
|
||||
|
||||
Collections.shuffle(settings, random());
|
||||
|
||||
// trigger all deprecations
|
||||
Request request = new Request("GET", "/_test_cluster/deprecated_settings");
|
||||
request.setEntity(buildSettingsRequest(settings, useDeprecatedField));
|
||||
Response response = getRestClient().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
|
||||
|
||||
final List<String> deprecatedWarnings = getWarningHeaders(response.getHeaders());
|
||||
final List<Matcher<String>> headerMatchers = new ArrayList<>(4);
|
||||
|
||||
headerMatchers.add(equalTo(TestDeprecationHeaderRestAction.DEPRECATED_ENDPOINT));
|
||||
if (useDeprecatedField) {
|
||||
headerMatchers.add(equalTo(TestDeprecationHeaderRestAction.DEPRECATED_USAGE));
|
||||
}
|
||||
for (Setting<?> setting : settings) {
|
||||
if (setting.isDeprecated()) {
|
||||
headerMatchers.add(equalTo(
|
||||
"[" + setting.getKey() + "] setting was deprecated in Elasticsearch and will be removed in a future release! " +
|
||||
"See the breaking changes documentation for the next major version."));
|
||||
}
|
||||
}
|
||||
|
||||
assertThat(deprecatedWarnings, hasSize(headerMatchers.size()));
|
||||
for (final String deprecatedWarning : deprecatedWarnings) {
|
||||
assertThat(deprecatedWarning, matches(WARNING_HEADER_PATTERN.pattern()));
|
||||
}
|
||||
final List<String> actualWarningValues =
|
||||
deprecatedWarnings.stream().map(s -> HeaderWarning.extractWarningValueFromWarningHeader(s, true))
|
||||
.collect(Collectors.toList());
|
||||
for (Matcher<String> headerMatcher : headerMatchers) {
|
||||
assertThat(actualWarningValues, hasItem(headerMatcher));
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> getWarningHeaders(Header[] headers) {
|
||||
List<String> warnings = new ArrayList<>();
|
||||
|
||||
for (Header header : headers) {
|
||||
if (header.getName().equals("Warning")) {
|
||||
warnings.add(header.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
return warnings;
|
||||
}
|
||||
|
||||
private HttpEntity buildSettingsRequest(List<Setting<Boolean>> settings, boolean useDeprecatedField) throws IOException {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
|
||||
builder.startObject().startArray(useDeprecatedField ? "deprecated_settings" : "settings");
|
||||
|
||||
for (Setting<Boolean> setting : settings) {
|
||||
builder.value(setting.getKey());
|
||||
}
|
||||
|
||||
builder.endArray().endObject();
|
||||
|
||||
return new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
|
||||
}
|
||||
|
||||
protected RestClient getRestClient() {
|
||||
return getRestClient(client());
|
||||
}
|
||||
|
||||
private static synchronized RestClient getRestClient(Client client) {
|
||||
if (restClient == null) {
|
||||
restClient = buildRestClient(client);
|
||||
}
|
||||
return restClient;
|
||||
}
|
||||
|
||||
private static RestClient buildRestClient(Client client ) {
|
||||
NodesInfoResponse nodesInfoResponse = client.admin().cluster().prepareNodesInfo().get();
|
||||
assertFalse(nodesInfoResponse.hasFailures());
|
||||
assertThat(nodesInfoResponse.getNodes(), hasSize(1));
|
||||
|
||||
NodeInfo node = nodesInfoResponse.getNodes().get(0);
|
||||
assertNotNull(node.getInfo(HttpInfo.class));
|
||||
|
||||
TransportAddress publishAddress = node.getInfo(HttpInfo.class).address().publishAddress();
|
||||
InetSocketAddress address = publishAddress.address();
|
||||
final HttpHost host = new HttpHost(NetworkAddress.format(address.getAddress()), address.getPort(), "http");
|
||||
RestClientBuilder builder = RestClient.builder(host);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
if (restClient != null) {
|
||||
IOUtils.closeWhileHandlingException(restClient);
|
||||
restClient = null;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,9 +0,0 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
grant {
|
||||
permission java.lang.RuntimePermission "*", "setContextClassLoader";
|
||||
};
|
||||
|
@ -5,27 +5,41 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.deprecation;
|
||||
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestHandler;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
|
||||
import org.elasticsearch.xpack.core.deprecation.NodesDeprecationCheckAction;
|
||||
import org.elasticsearch.xpack.deprecation.logging.DeprecationIndexingComponent;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.xpack.deprecation.logging.DeprecationIndexingComponent.WRITE_DEPRECATION_LOGS_TO_INDEX;
|
||||
|
||||
/**
|
||||
* The plugin class for the Deprecation API
|
||||
*/
|
||||
@ -47,4 +61,30 @@ public class Deprecation extends Plugin implements ActionPlugin {
|
||||
|
||||
return Collections.singletonList(new RestDeprecationInfoAction());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Object> createComponents(
|
||||
Client client,
|
||||
ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService,
|
||||
ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry,
|
||||
Environment environment,
|
||||
NodeEnvironment nodeEnvironment,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
Supplier<RepositoriesService> repositoriesServiceSupplier
|
||||
) {
|
||||
DeprecationIndexingComponent component = new DeprecationIndexingComponent(client, environment.settings());
|
||||
|
||||
clusterService.addListener(component);
|
||||
|
||||
return Collections.singletonList(component);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Collections.singletonList(WRITE_DEPRECATION_LOGS_TO_INDEX);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.deprecation.logging;
|
||||
|
||||
import org.apache.logging.log4j.core.Appender;
|
||||
import org.apache.logging.log4j.core.Core;
|
||||
import org.apache.logging.log4j.core.Filter;
|
||||
import org.apache.logging.log4j.core.Layout;
|
||||
import org.apache.logging.log4j.core.LogEvent;
|
||||
import org.apache.logging.log4j.core.appender.AbstractAppender;
|
||||
import org.apache.logging.log4j.core.config.plugins.Plugin;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* This log4j appender writes deprecation log messages to an index. It does not perform the actual
|
||||
* writes, but instead constructs an {@link IndexRequest} for the log message and passes that
|
||||
* to a callback.
|
||||
*/
|
||||
@Plugin(name = "DeprecationIndexingAppender", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE)
|
||||
public class DeprecationIndexingAppender extends AbstractAppender {
|
||||
public static final String DEPRECATION_MESSAGES_DATA_STREAM = "logs-deprecation-elasticsearch";
|
||||
|
||||
private final Consumer<IndexRequest> requestConsumer;
|
||||
|
||||
/**
|
||||
* You can't start and stop an appender to toggle it, so this flag reflects whether
|
||||
* writes should in fact be carried out.
|
||||
*/
|
||||
private volatile boolean isEnabled = false;
|
||||
|
||||
/**
|
||||
* Creates a new appender.
|
||||
* @param name the appender's name
|
||||
* @param filter a filter to apply directly on the appender
|
||||
* @param layout the layout to use for formatting message. It must return a JSON string.
|
||||
* @param requestConsumer a callback to handle the actual indexing of the log message.
|
||||
*/
|
||||
public DeprecationIndexingAppender(String name, Filter filter, Layout<String> layout, Consumer<IndexRequest> requestConsumer) {
|
||||
super(name, filter, layout);
|
||||
this.requestConsumer = Objects.requireNonNull(requestConsumer, "requestConsumer cannot be null");
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs an index request for a deprecation message, and passes it to the callback that was
|
||||
* supplied to {@link #DeprecationIndexingAppender(String, Filter, Layout, Consumer)}.
|
||||
*/
|
||||
@Override
|
||||
public void append(LogEvent event) {
|
||||
if (this.isEnabled == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
final byte[] payload = this.getLayout().toByteArray(event);
|
||||
|
||||
final IndexRequest request = new IndexRequest(DEPRECATION_MESSAGES_DATA_STREAM).source(payload, XContentType.JSON)
|
||||
.opType(DocWriteRequest.OpType.CREATE);
|
||||
|
||||
this.requestConsumer.accept(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets whether this appender is enabled or disabled. When disabled, the appender will
|
||||
* not perform indexing operations.
|
||||
* @param isEnabled the enabled status of the appender.
|
||||
*/
|
||||
public void setEnabled(boolean isEnabled) {
|
||||
this.isEnabled = isEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the appender is enabled i.e. performing indexing operations.
|
||||
*/
|
||||
public boolean isEnabled() {
|
||||
return isEnabled;
|
||||
}
|
||||
}
|
@ -0,0 +1,164 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.deprecation.logging;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.core.LoggerContext;
|
||||
import org.apache.logging.log4j.core.config.Configuration;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.OriginSettingClient;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.logging.RateLimitingFilter;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
|
||||
/**
|
||||
* This component manages the construction and lifecycle of the {@link DeprecationIndexingAppender}.
|
||||
* It also starts and stops the appender
|
||||
*/
|
||||
public class DeprecationIndexingComponent extends AbstractLifecycleComponent implements ClusterStateListener {
|
||||
private static final Logger logger = LogManager.getLogger(DeprecationIndexingComponent.class);
|
||||
|
||||
public static final Setting<Boolean> WRITE_DEPRECATION_LOGS_TO_INDEX = Setting.boolSetting(
|
||||
"cluster.deprecation_indexing.enabled",
|
||||
false,
|
||||
Setting.Property.NodeScope,
|
||||
Setting.Property.Dynamic
|
||||
);
|
||||
|
||||
private final DeprecationIndexingAppender appender;
|
||||
private final BulkProcessor processor;
|
||||
private final RateLimitingFilter filter;
|
||||
|
||||
public DeprecationIndexingComponent(Client client, Settings settings) {
|
||||
this.processor = getBulkProcessor(new OriginSettingClient(client, ClientHelper.DEPRECATION_ORIGIN), settings);
|
||||
final Consumer<IndexRequest> consumer = this.processor::add;
|
||||
|
||||
final LoggerContext context = (LoggerContext) LogManager.getContext(false);
|
||||
final Configuration configuration = context.getConfiguration();
|
||||
|
||||
final EcsJsonLayout layout = EcsJsonLayout.newBuilder()
|
||||
.setType("deprecation")
|
||||
.setESMessageFields("key,x-opaque-id")
|
||||
.setConfiguration(configuration)
|
||||
.build();
|
||||
|
||||
this.filter = new RateLimitingFilter();
|
||||
this.appender = new DeprecationIndexingAppender("deprecation_indexing_appender", filter, layout, consumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
this.appender.start();
|
||||
Loggers.addAppender(LogManager.getLogger("org.elasticsearch.deprecation"), this.appender);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
Loggers.removeAppender(LogManager.getLogger("org.elasticsearch.deprecation"), this.appender);
|
||||
this.appender.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
this.processor.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Listens for changes to the cluster state, in order to know whether to toggle indexing
|
||||
* and to set the cluster UUID and node ID. These can't be set in the constructor because
|
||||
* the initial cluster state won't be set yet.
|
||||
*
|
||||
* @param event the cluster state event to process
|
||||
*/
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
final ClusterState state = event.state();
|
||||
final boolean newEnabled = WRITE_DEPRECATION_LOGS_TO_INDEX.get(state.getMetadata().settings());
|
||||
if (appender.isEnabled() != newEnabled) {
|
||||
// We've flipped from disabled to enabled. Make sure we start with a clean cache of
|
||||
// previously-seen keys, otherwise we won't index anything.
|
||||
if (newEnabled) {
|
||||
this.filter.reset();
|
||||
}
|
||||
appender.setEnabled(newEnabled);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a bulk processor for writing documents
|
||||
*
|
||||
* @param client the client to use
|
||||
* @param settings the settings to use
|
||||
* @return an initialised bulk processor
|
||||
*/
|
||||
private BulkProcessor getBulkProcessor(Client client, Settings settings) {
|
||||
final OriginSettingClient originSettingClient = new OriginSettingClient(client, ClientHelper.DEPRECATION_ORIGIN);
|
||||
final BulkProcessor.Listener listener = new DeprecationBulkListener();
|
||||
|
||||
// This configuration disables the size count and size thresholds,
|
||||
// and instead uses a scheduled flush only. This means that calling
|
||||
// processor.add() will not block the calling thread.
|
||||
return BulkProcessor.builder(originSettingClient::bulk, listener)
|
||||
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3))
|
||||
.setConcurrentRequests(Math.max(2, EsExecutors.allocatedProcessors(settings)))
|
||||
.setBulkActions(-1)
|
||||
.setBulkSize(new ByteSizeValue(-1, ByteSizeUnit.BYTES))
|
||||
.setFlushInterval(TimeValue.timeValueSeconds(5))
|
||||
.build();
|
||||
}
|
||||
|
||||
private static class DeprecationBulkListener implements BulkProcessor.Listener {
|
||||
@Override
|
||||
public void beforeBulk(long executionId, BulkRequest request) {}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
|
||||
long numberOfActions = request.numberOfActions();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(
|
||||
"indexed [{}] deprecation documents into [{}]",
|
||||
numberOfActions,
|
||||
Arrays.stream(response.getItems()).map(BulkItemResponse::getIndex).distinct().collect(Collectors.joining(","))
|
||||
);
|
||||
}
|
||||
|
||||
if (response.hasFailures()) {
|
||||
Map<String, String> failures = Arrays.stream(response.getItems())
|
||||
.filter(BulkItemResponse::isFailed)
|
||||
.collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage));
|
||||
logger.error("Bulk write of deprecation logs encountered some failures: [{}]", failures);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
|
||||
logger.error("Bulk write of " + request.numberOfActions() + " deprecation logs failed: " + failure.getMessage(), failure);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,200 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.deprecation.logging;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.logging.log4j.core.Layout;
|
||||
import org.apache.logging.log4j.core.LogEvent;
|
||||
import org.apache.logging.log4j.core.config.Node;
|
||||
import org.apache.logging.log4j.core.config.plugins.Plugin;
|
||||
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
|
||||
import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
|
||||
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
|
||||
import org.apache.logging.log4j.core.layout.AbstractStringLayout;
|
||||
import org.apache.logging.log4j.core.layout.ByteBufferDestination;
|
||||
import org.apache.logging.log4j.core.layout.PatternLayout;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.logging.ESJsonLayout;
|
||||
|
||||
/**
|
||||
* This is in essense a fork of {@link ESJsonLayout}, with tweaks to align the output more closely
|
||||
* with ECS. This will be removed in the next major release of ES.
|
||||
*/
|
||||
@Plugin(name = "EcsJsonLayout", category = Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)
|
||||
public class EcsJsonLayout extends AbstractStringLayout {
|
||||
private static final String ECS_VERSION = "1.6";
|
||||
|
||||
private final PatternLayout patternLayout;
|
||||
|
||||
protected EcsJsonLayout(String typeName, Charset charset, String[] esmessagefields) {
|
||||
super(charset);
|
||||
this.patternLayout = PatternLayout.newBuilder()
|
||||
.withPattern(pattern(typeName, esmessagefields))
|
||||
.withAlwaysWriteExceptions(false)
|
||||
.build();
|
||||
}
|
||||
|
||||
protected String pattern(String type, String[] esMessageFields) {
|
||||
if (Strings.isEmpty(type)) {
|
||||
throw new IllegalArgumentException("layout parameter 'type_name' cannot be empty");
|
||||
}
|
||||
Map<String, Object> map = new LinkedHashMap<>();
|
||||
map.put("type", inQuotes(type));
|
||||
map.put("@timestamp", inQuotes("%d{yyyy-MM-dd'T'HH:mm:ss,SSSZZ}"));
|
||||
map.put("level", inQuotes("%p"));
|
||||
map.put("component", inQuotes("%c"));
|
||||
map.put("cluster.name", inQuotes("${sys:es.logs.cluster_name}"));
|
||||
map.put("node.name", inQuotes("%node_name"));
|
||||
map.put("message", inQuotes("%notEmpty{%enc{%marker}{JSON} }%enc{%.-10000m}{JSON}"));
|
||||
map.put("data_stream.type", inQuotes("logs"));
|
||||
map.put("data_stream.datatype", inQuotes("deprecation"));
|
||||
map.put("data_stream.namespace", inQuotes("elasticsearch"));
|
||||
map.put("ecs.version", inQuotes(ECS_VERSION));
|
||||
|
||||
for (String key : esMessageFields) {
|
||||
map.put(key, inQuotes("%ESMessageField{" + key + "}"));
|
||||
}
|
||||
return createPattern(map, Stream.of(esMessageFields).collect(Collectors.toSet()));
|
||||
}
|
||||
|
||||
|
||||
protected String createPattern(Map<String, Object> map, Set<String> esMessageFields) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("{");
|
||||
String separator = "";
|
||||
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
||||
|
||||
if (esMessageFields.contains(entry.getKey())) {
|
||||
sb.append("%notEmpty{");
|
||||
sb.append(separator);
|
||||
appendField(sb, entry);
|
||||
sb.append("}");
|
||||
} else {
|
||||
sb.append(separator);
|
||||
appendField(sb, entry);
|
||||
}
|
||||
|
||||
separator = ", ";
|
||||
}
|
||||
sb.append(notEmpty(", %node_and_cluster_id "));
|
||||
sb.append("%exceptionAsJson ");
|
||||
sb.append("}");
|
||||
sb.append(System.lineSeparator());
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private void appendField(StringBuilder sb, Map.Entry<String, Object> entry) {
|
||||
sb.append(jsonKey(entry.getKey()));
|
||||
sb.append(entry.getValue().toString());
|
||||
}
|
||||
|
||||
private String notEmpty(String value) {
|
||||
return "%notEmpty{" + value + "}";
|
||||
}
|
||||
|
||||
private CharSequence jsonKey(String s) {
|
||||
return inQuotes(s) + ": ";
|
||||
}
|
||||
|
||||
protected String inQuotes(String s) {
|
||||
return "\"" + s + "\"";
|
||||
}
|
||||
|
||||
@PluginFactory
|
||||
public static EcsJsonLayout createLayout(String type,
|
||||
Charset charset,
|
||||
String[] esmessagefields) {
|
||||
return new EcsJsonLayout(type, charset, esmessagefields);
|
||||
}
|
||||
|
||||
PatternLayout getPatternLayout() {
|
||||
return patternLayout;
|
||||
}
|
||||
|
||||
public static class Builder<B extends EcsJsonLayout.Builder<B>> extends AbstractStringLayout.Builder<B>
|
||||
implements org.apache.logging.log4j.core.util.Builder<EcsJsonLayout> {
|
||||
|
||||
@PluginAttribute("type_name")
|
||||
String type;
|
||||
|
||||
@PluginAttribute(value = "charset", defaultString = "UTF-8")
|
||||
Charset charset;
|
||||
|
||||
@PluginAttribute("esmessagefields")
|
||||
private String esMessageFields;
|
||||
|
||||
public Builder() {
|
||||
setCharset(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EcsJsonLayout build() {
|
||||
String[] split = Strings.isNullOrEmpty(esMessageFields) ? new String[]{} : esMessageFields.split(",");
|
||||
return EcsJsonLayout.createLayout(type, charset, split);
|
||||
}
|
||||
|
||||
public Charset getCharset() {
|
||||
return charset;
|
||||
}
|
||||
|
||||
public B setCharset(final Charset charset) {
|
||||
this.charset = charset;
|
||||
return asBuilder();
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public B setType(final String type) {
|
||||
this.type = type;
|
||||
return asBuilder();
|
||||
}
|
||||
|
||||
public String getESMessageFields() {
|
||||
return esMessageFields;
|
||||
}
|
||||
|
||||
public B setESMessageFields(String esmessagefields) {
|
||||
this.esMessageFields = esmessagefields;
|
||||
return asBuilder();
|
||||
}
|
||||
}
|
||||
|
||||
@PluginBuilderFactory
|
||||
public static <B extends EcsJsonLayout.Builder<B>> B newBuilder() {
|
||||
return new EcsJsonLayout.Builder<B>().asBuilder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toSerializable(final LogEvent event) {
|
||||
return patternLayout.toSerializable(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getContentFormat() {
|
||||
return patternLayout.getContentFormat();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(final LogEvent event, final ByteBufferDestination destination) {
|
||||
patternLayout.encode(event, destination);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "EcsJsonLayout{patternLayout=" + patternLayout + '}';
|
||||
}
|
||||
}
|
@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.deprecation;
|
||||
|
||||
import org.apache.logging.log4j.core.Layout;
|
||||
import org.apache.logging.log4j.core.LogEvent;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.deprecation.logging.DeprecationIndexingAppender;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.Matchers.hasEntry;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class DeprecationIndexingAppenderTests extends ESTestCase {
|
||||
|
||||
private DeprecationIndexingAppender appender;
|
||||
private Layout<String> layout;
|
||||
private Consumer<IndexRequest> consumer;
|
||||
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
public void initialize() {
|
||||
layout = mock(Layout.class);
|
||||
consumer = mock(Consumer.class);
|
||||
appender = new DeprecationIndexingAppender("a name", null, layout, consumer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that the service does not attempt to index messages when the service
|
||||
* is disabled.
|
||||
*/
|
||||
public void testDoesNotWriteMessageWhenServiceDisabled() {
|
||||
appender.append(mock(LogEvent.class));
|
||||
|
||||
verify(consumer, never()).accept(any());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that the service can be disabled after being enabled.
|
||||
*/
|
||||
public void testDoesNotWriteMessageWhenServiceEnabledAndDisabled() {
|
||||
appender.setEnabled(true);
|
||||
appender.setEnabled(false);
|
||||
|
||||
appender.append(mock(LogEvent.class));
|
||||
|
||||
verify(consumer, never()).accept(any());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that messages are indexed in the correct shape when the service is enabled.
|
||||
* Formatted is handled entirely by the configured Layout, so that is not verified here.
|
||||
*/
|
||||
public void testWritesMessageWhenServiceEnabled() {
|
||||
appender.setEnabled(true);
|
||||
|
||||
when(layout.toByteArray(any())).thenReturn("{ \"some key\": \"some value\" }".getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
appender.append(mock(LogEvent.class));
|
||||
|
||||
ArgumentCaptor<IndexRequest> argument = ArgumentCaptor.forClass(IndexRequest.class);
|
||||
|
||||
verify(consumer).accept(argument.capture());
|
||||
|
||||
final IndexRequest indexRequest = argument.getValue();
|
||||
final Map<String, Object> payloadMap = indexRequest.sourceAsMap();
|
||||
|
||||
assertThat(payloadMap, hasEntry("some key", "some value"));
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user