Remove deprecation indexing code from 7.10 (#63942)
The deprecation indexing code was writing to a regular data stream, and it is not yet possible to hide a data stream or prefix it with a period. This functionality we be re-added once it is possible to mark a data stream as hidden, and also to not rely on the standard logs template since that can be disabled.
This commit is contained in:
parent
7551c4dc7f
commit
bfd2cbed86
|
@ -5,15 +5,25 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.deprecation;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.client.RestClientBuilder;
|
||||
|
@ -27,26 +37,6 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
|
|||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasEntry;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.hasItems;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
/**
|
||||
* Tests {@code DeprecationLogger} uses the {@code ThreadContext} to add response headers.
|
||||
*/
|
||||
|
@ -222,104 +212,6 @@ public class DeprecationHttpIT extends ESRestTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that deprecation messages can be recorded to an index
|
||||
*/
|
||||
public void testDeprecationMessagesCanBeIndexed() throws Exception {
|
||||
try {
|
||||
configureWriteDeprecationLogsToIndex(true);
|
||||
|
||||
final Request request = new Request("GET", "/_test_cluster/deprecated_settings");
|
||||
final RequestOptions options = request.getOptions().toBuilder().addHeader("X-Opaque-Id", "some xid").build();
|
||||
request.setOptions(options);
|
||||
request.setEntity(
|
||||
buildSettingsRequest(Collections.singletonList(TestDeprecationHeaderRestAction.TEST_DEPRECATED_SETTING_TRUE1), true)
|
||||
);
|
||||
assertOK(client().performRequest(request));
|
||||
|
||||
assertBusy(() -> {
|
||||
Response response;
|
||||
try {
|
||||
response = client().performRequest(new Request("GET", "logs-deprecation-elasticsearch/_search"));
|
||||
} catch (Exception e) {
|
||||
// It can take a moment for the index to be created. If it doesn't exist then the client
|
||||
// throws an exception. Translate it into an assertion error so that assertBusy() will
|
||||
// continue trying.
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
assertOK(response);
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
final JsonNode jsonNode = mapper.readTree(response.getEntity().getContent());
|
||||
|
||||
final int hits = jsonNode.at("/hits/total/value").intValue();
|
||||
assertThat(hits, greaterThan(0));
|
||||
|
||||
List<Map<String, Object>> documents = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < hits; i++) {
|
||||
final JsonNode hit = jsonNode.at("/hits/hits/" + i + "/_source");
|
||||
|
||||
final Map<String, Object> document = new HashMap<>();
|
||||
hit.fields().forEachRemaining(entry -> document.put(entry.getKey(), entry.getValue().textValue()));
|
||||
|
||||
documents.add(document);
|
||||
}
|
||||
|
||||
logger.warn(documents);
|
||||
assertThat(documents, hasSize(2));
|
||||
|
||||
assertThat(
|
||||
documents,
|
||||
hasItems(
|
||||
allOf(
|
||||
hasKey("@timestamp"),
|
||||
hasKey("cluster.name"),
|
||||
hasKey("cluster.uuid"),
|
||||
hasKey("component"),
|
||||
hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
|
||||
hasEntry("data_stream.namespace", "default"),
|
||||
hasEntry("data_stream.type", "logs"),
|
||||
hasEntry("ecs.version", "1.6"),
|
||||
hasEntry("key", "deprecated_settings"),
|
||||
hasEntry("level", "DEPRECATION"),
|
||||
hasEntry("message", "[deprecated_settings] usage is deprecated. use [settings] instead"),
|
||||
hasKey("node.id"),
|
||||
hasKey("node.name"),
|
||||
hasEntry("x-opaque-id", "some xid")
|
||||
),
|
||||
allOf(
|
||||
hasKey("@timestamp"),
|
||||
hasKey("cluster.name"),
|
||||
hasKey("cluster.uuid"),
|
||||
hasKey("component"),
|
||||
hasEntry("data_stream.dataset", "deprecation.elasticsearch"),
|
||||
hasEntry("data_stream.namespace", "default"),
|
||||
hasEntry("data_stream.type", "logs"),
|
||||
hasEntry("ecs.version", "1.6"),
|
||||
hasEntry("key", "deprecated_route"),
|
||||
hasEntry("level", "DEPRECATION"),
|
||||
hasEntry("message", "[/_test_cluster/deprecated_settings] exists for deprecated tests"),
|
||||
hasKey("node.id"),
|
||||
hasKey("node.name"),
|
||||
hasEntry("x-opaque-id", "some xid")
|
||||
)
|
||||
)
|
||||
);
|
||||
});
|
||||
} finally {
|
||||
configureWriteDeprecationLogsToIndex(null);
|
||||
client().performRequest(new Request("DELETE", "_data_stream/logs-deprecation-elasticsearch"));
|
||||
}
|
||||
}
|
||||
|
||||
private void configureWriteDeprecationLogsToIndex(Boolean value) throws IOException {
|
||||
final Request request = new Request("PUT", "_cluster/settings");
|
||||
request.setJsonEntity("{ \"transient\": { \"cluster.deprecation_indexing.enabled\": " + value + " } }");
|
||||
final Response response = client().performRequest(request);
|
||||
assertOK(response);
|
||||
}
|
||||
|
||||
private List<String> getWarningHeaders(Header[] headers) {
|
||||
List<String> warnings = new ArrayList<>();
|
||||
|
||||
|
|
|
@ -5,40 +5,25 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.deprecation;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestHandler;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
|
||||
import org.elasticsearch.xpack.core.deprecation.NodesDeprecationCheckAction;
|
||||
import org.elasticsearch.xpack.deprecation.logging.DeprecationIndexingComponent;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.xpack.deprecation.logging.DeprecationIndexingComponent.WRITE_DEPRECATION_LOGS_TO_INDEX;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestHandler;
|
||||
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
|
||||
import org.elasticsearch.xpack.core.deprecation.NodesDeprecationCheckAction;
|
||||
|
||||
/**
|
||||
* The plugin class for the Deprecation API
|
||||
|
@ -61,30 +46,4 @@ public class Deprecation extends Plugin implements ActionPlugin {
|
|||
|
||||
return Collections.singletonList(new RestDeprecationInfoAction());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Object> createComponents(
|
||||
Client client,
|
||||
ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService,
|
||||
ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry,
|
||||
Environment environment,
|
||||
NodeEnvironment nodeEnvironment,
|
||||
NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
Supplier<RepositoriesService> repositoriesServiceSupplier
|
||||
) {
|
||||
DeprecationIndexingComponent component = new DeprecationIndexingComponent(client, environment.settings());
|
||||
|
||||
clusterService.addListener(component);
|
||||
|
||||
return Collections.singletonList(component);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Collections.singletonList(WRITE_DEPRECATION_LOGS_TO_INDEX);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,85 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.deprecation.logging;
|
||||
|
||||
import org.apache.logging.log4j.core.Appender;
|
||||
import org.apache.logging.log4j.core.Core;
|
||||
import org.apache.logging.log4j.core.Filter;
|
||||
import org.apache.logging.log4j.core.Layout;
|
||||
import org.apache.logging.log4j.core.LogEvent;
|
||||
import org.apache.logging.log4j.core.appender.AbstractAppender;
|
||||
import org.apache.logging.log4j.core.config.plugins.Plugin;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* This log4j appender writes deprecation log messages to an index. It does not perform the actual
|
||||
* writes, but instead constructs an {@link IndexRequest} for the log message and passes that
|
||||
* to a callback.
|
||||
*/
|
||||
@Plugin(name = "DeprecationIndexingAppender", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE)
|
||||
public class DeprecationIndexingAppender extends AbstractAppender {
|
||||
public static final String DEPRECATION_MESSAGES_DATA_STREAM = "logs-deprecation-elasticsearch";
|
||||
|
||||
private final Consumer<IndexRequest> requestConsumer;
|
||||
|
||||
/**
|
||||
* You can't start and stop an appender to toggle it, so this flag reflects whether
|
||||
* writes should in fact be carried out.
|
||||
*/
|
||||
private volatile boolean isEnabled = false;
|
||||
|
||||
/**
|
||||
* Creates a new appender.
|
||||
* @param name the appender's name
|
||||
* @param filter a filter to apply directly on the appender
|
||||
* @param layout the layout to use for formatting message. It must return a JSON string.
|
||||
* @param requestConsumer a callback to handle the actual indexing of the log message.
|
||||
*/
|
||||
public DeprecationIndexingAppender(String name, Filter filter, Layout<String> layout, Consumer<IndexRequest> requestConsumer) {
|
||||
super(name, filter, layout);
|
||||
this.requestConsumer = Objects.requireNonNull(requestConsumer, "requestConsumer cannot be null");
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs an index request for a deprecation message, and passes it to the callback that was
|
||||
* supplied to {@link #DeprecationIndexingAppender(String, Filter, Layout, Consumer)}.
|
||||
*/
|
||||
@Override
|
||||
public void append(LogEvent event) {
|
||||
if (this.isEnabled == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
final byte[] payload = this.getLayout().toByteArray(event);
|
||||
|
||||
final IndexRequest request = new IndexRequest(DEPRECATION_MESSAGES_DATA_STREAM).source(payload, XContentType.JSON)
|
||||
.opType(DocWriteRequest.OpType.CREATE);
|
||||
|
||||
this.requestConsumer.accept(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets whether this appender is enabled or disabled. When disabled, the appender will
|
||||
* not perform indexing operations.
|
||||
* @param isEnabled the enabled status of the appender.
|
||||
*/
|
||||
public void setEnabled(boolean isEnabled) {
|
||||
this.isEnabled = isEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the appender is enabled i.e. performing indexing operations.
|
||||
*/
|
||||
public boolean isEnabled() {
|
||||
return isEnabled;
|
||||
}
|
||||
}
|
|
@ -1,164 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.deprecation.logging;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.core.LoggerContext;
|
||||
import org.apache.logging.log4j.core.config.Configuration;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.OriginSettingClient;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.logging.RateLimitingFilter;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
|
||||
/**
|
||||
* This component manages the construction and lifecycle of the {@link DeprecationIndexingAppender}.
|
||||
* It also starts and stops the appender
|
||||
*/
|
||||
public class DeprecationIndexingComponent extends AbstractLifecycleComponent implements ClusterStateListener {
|
||||
private static final Logger logger = LogManager.getLogger(DeprecationIndexingComponent.class);
|
||||
|
||||
public static final Setting<Boolean> WRITE_DEPRECATION_LOGS_TO_INDEX = Setting.boolSetting(
|
||||
"cluster.deprecation_indexing.enabled",
|
||||
false,
|
||||
Setting.Property.NodeScope,
|
||||
Setting.Property.Dynamic
|
||||
);
|
||||
|
||||
private final DeprecationIndexingAppender appender;
|
||||
private final BulkProcessor processor;
|
||||
private final RateLimitingFilter filter;
|
||||
|
||||
public DeprecationIndexingComponent(Client client, Settings settings) {
|
||||
this.processor = getBulkProcessor(new OriginSettingClient(client, ClientHelper.DEPRECATION_ORIGIN), settings);
|
||||
final Consumer<IndexRequest> consumer = this.processor::add;
|
||||
|
||||
final LoggerContext context = (LoggerContext) LogManager.getContext(false);
|
||||
final Configuration configuration = context.getConfiguration();
|
||||
|
||||
final EcsJsonLayout layout = EcsJsonLayout.newBuilder()
|
||||
.setType("deprecation")
|
||||
.setESMessageFields("key,x-opaque-id")
|
||||
.setConfiguration(configuration)
|
||||
.build();
|
||||
|
||||
this.filter = new RateLimitingFilter();
|
||||
this.appender = new DeprecationIndexingAppender("deprecation_indexing_appender", filter, layout, consumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
this.appender.start();
|
||||
Loggers.addAppender(LogManager.getLogger("org.elasticsearch.deprecation"), this.appender);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
Loggers.removeAppender(LogManager.getLogger("org.elasticsearch.deprecation"), this.appender);
|
||||
this.appender.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
this.processor.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Listens for changes to the cluster state, in order to know whether to toggle indexing
|
||||
* and to set the cluster UUID and node ID. These can't be set in the constructor because
|
||||
* the initial cluster state won't be set yet.
|
||||
*
|
||||
* @param event the cluster state event to process
|
||||
*/
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
final ClusterState state = event.state();
|
||||
final boolean newEnabled = WRITE_DEPRECATION_LOGS_TO_INDEX.get(state.getMetadata().settings());
|
||||
if (appender.isEnabled() != newEnabled) {
|
||||
// We've flipped from disabled to enabled. Make sure we start with a clean cache of
|
||||
// previously-seen keys, otherwise we won't index anything.
|
||||
if (newEnabled) {
|
||||
this.filter.reset();
|
||||
}
|
||||
appender.setEnabled(newEnabled);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a bulk processor for writing documents
|
||||
*
|
||||
* @param client the client to use
|
||||
* @param settings the settings to use
|
||||
* @return an initialised bulk processor
|
||||
*/
|
||||
private BulkProcessor getBulkProcessor(Client client, Settings settings) {
|
||||
final OriginSettingClient originSettingClient = new OriginSettingClient(client, ClientHelper.DEPRECATION_ORIGIN);
|
||||
final BulkProcessor.Listener listener = new DeprecationBulkListener();
|
||||
|
||||
// This configuration disables the size count and size thresholds,
|
||||
// and instead uses a scheduled flush only. This means that calling
|
||||
// processor.add() will not block the calling thread.
|
||||
return BulkProcessor.builder(originSettingClient::bulk, listener)
|
||||
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3))
|
||||
.setConcurrentRequests(Math.max(2, EsExecutors.allocatedProcessors(settings)))
|
||||
.setBulkActions(-1)
|
||||
.setBulkSize(new ByteSizeValue(-1, ByteSizeUnit.BYTES))
|
||||
.setFlushInterval(TimeValue.timeValueSeconds(5))
|
||||
.build();
|
||||
}
|
||||
|
||||
private static class DeprecationBulkListener implements BulkProcessor.Listener {
|
||||
@Override
|
||||
public void beforeBulk(long executionId, BulkRequest request) {}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
|
||||
long numberOfActions = request.numberOfActions();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(
|
||||
"indexed [{}] deprecation documents into [{}]",
|
||||
numberOfActions,
|
||||
Arrays.stream(response.getItems()).map(BulkItemResponse::getIndex).distinct().collect(Collectors.joining(","))
|
||||
);
|
||||
}
|
||||
|
||||
if (response.hasFailures()) {
|
||||
Map<String, String> failures = Arrays.stream(response.getItems())
|
||||
.filter(BulkItemResponse::isFailed)
|
||||
.collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage));
|
||||
logger.error("Bulk write of deprecation logs encountered some failures: [{}]", failures);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
|
||||
logger.error("Bulk write of " + request.numberOfActions() + " deprecation logs failed: " + failure.getMessage(), failure);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,200 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.deprecation.logging;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.logging.log4j.core.Layout;
|
||||
import org.apache.logging.log4j.core.LogEvent;
|
||||
import org.apache.logging.log4j.core.config.Node;
|
||||
import org.apache.logging.log4j.core.config.plugins.Plugin;
|
||||
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
|
||||
import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
|
||||
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
|
||||
import org.apache.logging.log4j.core.layout.AbstractStringLayout;
|
||||
import org.apache.logging.log4j.core.layout.ByteBufferDestination;
|
||||
import org.apache.logging.log4j.core.layout.PatternLayout;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.logging.ESJsonLayout;
|
||||
|
||||
/**
|
||||
* This is in essense a fork of {@link ESJsonLayout}, with tweaks to align the output more closely
|
||||
* with ECS. This will be removed in the next major release of ES.
|
||||
*/
|
||||
@Plugin(name = "EcsJsonLayout", category = Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)
|
||||
public class EcsJsonLayout extends AbstractStringLayout {
|
||||
private static final String ECS_VERSION = "1.6";
|
||||
|
||||
private final PatternLayout patternLayout;
|
||||
|
||||
protected EcsJsonLayout(String typeName, Charset charset, String[] esmessagefields) {
|
||||
super(charset);
|
||||
this.patternLayout = PatternLayout.newBuilder()
|
||||
.withPattern(pattern(typeName, esmessagefields))
|
||||
.withAlwaysWriteExceptions(false)
|
||||
.build();
|
||||
}
|
||||
|
||||
protected String pattern(String type, String[] esMessageFields) {
|
||||
if (Strings.isEmpty(type)) {
|
||||
throw new IllegalArgumentException("layout parameter 'type_name' cannot be empty");
|
||||
}
|
||||
Map<String, Object> map = new LinkedHashMap<>();
|
||||
map.put("type", inQuotes(type));
|
||||
map.put("@timestamp", inQuotes("%d{yyyy-MM-dd'T'HH:mm:ss,SSSZZ}"));
|
||||
map.put("level", inQuotes("%p"));
|
||||
map.put("component", inQuotes("%c"));
|
||||
map.put("cluster.name", inQuotes("${sys:es.logs.cluster_name}"));
|
||||
map.put("node.name", inQuotes("%node_name"));
|
||||
map.put("message", inQuotes("%notEmpty{%enc{%marker}{JSON} }%enc{%.-10000m}{JSON}"));
|
||||
map.put("data_stream.type", inQuotes("logs"));
|
||||
map.put("data_stream.dataset", inQuotes("deprecation.elasticsearch"));
|
||||
map.put("data_stream.namespace", inQuotes("default"));
|
||||
map.put("ecs.version", inQuotes(ECS_VERSION));
|
||||
|
||||
for (String key : esMessageFields) {
|
||||
map.put(key, inQuotes("%ESMessageField{" + key + "}"));
|
||||
}
|
||||
return createPattern(map, Stream.of(esMessageFields).collect(Collectors.toSet()));
|
||||
}
|
||||
|
||||
|
||||
protected String createPattern(Map<String, Object> map, Set<String> esMessageFields) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("{");
|
||||
String separator = "";
|
||||
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
||||
|
||||
if (esMessageFields.contains(entry.getKey())) {
|
||||
sb.append("%notEmpty{");
|
||||
sb.append(separator);
|
||||
appendField(sb, entry);
|
||||
sb.append("}");
|
||||
} else {
|
||||
sb.append(separator);
|
||||
appendField(sb, entry);
|
||||
}
|
||||
|
||||
separator = ", ";
|
||||
}
|
||||
sb.append(notEmpty(", %node_and_cluster_id "));
|
||||
sb.append("%exceptionAsJson ");
|
||||
sb.append("}");
|
||||
sb.append(System.lineSeparator());
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private void appendField(StringBuilder sb, Map.Entry<String, Object> entry) {
|
||||
sb.append(jsonKey(entry.getKey()));
|
||||
sb.append(entry.getValue().toString());
|
||||
}
|
||||
|
||||
private String notEmpty(String value) {
|
||||
return "%notEmpty{" + value + "}";
|
||||
}
|
||||
|
||||
private CharSequence jsonKey(String s) {
|
||||
return inQuotes(s) + ": ";
|
||||
}
|
||||
|
||||
protected String inQuotes(String s) {
|
||||
return "\"" + s + "\"";
|
||||
}
|
||||
|
||||
@PluginFactory
|
||||
public static EcsJsonLayout createLayout(String type,
|
||||
Charset charset,
|
||||
String[] esmessagefields) {
|
||||
return new EcsJsonLayout(type, charset, esmessagefields);
|
||||
}
|
||||
|
||||
PatternLayout getPatternLayout() {
|
||||
return patternLayout;
|
||||
}
|
||||
|
||||
public static class Builder<B extends EcsJsonLayout.Builder<B>> extends AbstractStringLayout.Builder<B>
|
||||
implements org.apache.logging.log4j.core.util.Builder<EcsJsonLayout> {
|
||||
|
||||
@PluginAttribute("type_name")
|
||||
String type;
|
||||
|
||||
@PluginAttribute(value = "charset", defaultString = "UTF-8")
|
||||
Charset charset;
|
||||
|
||||
@PluginAttribute("esmessagefields")
|
||||
private String esMessageFields;
|
||||
|
||||
public Builder() {
|
||||
setCharset(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EcsJsonLayout build() {
|
||||
String[] split = Strings.isNullOrEmpty(esMessageFields) ? new String[]{} : esMessageFields.split(",");
|
||||
return EcsJsonLayout.createLayout(type, charset, split);
|
||||
}
|
||||
|
||||
public Charset getCharset() {
|
||||
return charset;
|
||||
}
|
||||
|
||||
public B setCharset(final Charset charset) {
|
||||
this.charset = charset;
|
||||
return asBuilder();
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public B setType(final String type) {
|
||||
this.type = type;
|
||||
return asBuilder();
|
||||
}
|
||||
|
||||
public String getESMessageFields() {
|
||||
return esMessageFields;
|
||||
}
|
||||
|
||||
public B setESMessageFields(String esmessagefields) {
|
||||
this.esMessageFields = esmessagefields;
|
||||
return asBuilder();
|
||||
}
|
||||
}
|
||||
|
||||
@PluginBuilderFactory
|
||||
public static <B extends EcsJsonLayout.Builder<B>> B newBuilder() {
|
||||
return new EcsJsonLayout.Builder<B>().asBuilder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toSerializable(final LogEvent event) {
|
||||
return patternLayout.toSerializable(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getContentFormat() {
|
||||
return patternLayout.getContentFormat();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(final LogEvent event, final ByteBufferDestination destination) {
|
||||
patternLayout.encode(event, destination);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "EcsJsonLayout{patternLayout=" + patternLayout + '}';
|
||||
}
|
||||
}
|
|
@ -1,84 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.deprecation;
|
||||
|
||||
import org.apache.logging.log4j.core.Layout;
|
||||
import org.apache.logging.log4j.core.LogEvent;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.deprecation.logging.DeprecationIndexingAppender;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.Matchers.hasEntry;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class DeprecationIndexingAppenderTests extends ESTestCase {
|
||||
|
||||
private DeprecationIndexingAppender appender;
|
||||
private Layout<String> layout;
|
||||
private Consumer<IndexRequest> consumer;
|
||||
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
public void initialize() {
|
||||
layout = mock(Layout.class);
|
||||
consumer = mock(Consumer.class);
|
||||
appender = new DeprecationIndexingAppender("a name", null, layout, consumer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that the service does not attempt to index messages when the service
|
||||
* is disabled.
|
||||
*/
|
||||
public void testDoesNotWriteMessageWhenServiceDisabled() {
|
||||
appender.append(mock(LogEvent.class));
|
||||
|
||||
verify(consumer, never()).accept(any());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that the service can be disabled after being enabled.
|
||||
*/
|
||||
public void testDoesNotWriteMessageWhenServiceEnabledAndDisabled() {
|
||||
appender.setEnabled(true);
|
||||
appender.setEnabled(false);
|
||||
|
||||
appender.append(mock(LogEvent.class));
|
||||
|
||||
verify(consumer, never()).accept(any());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that messages are indexed in the correct shape when the service is enabled.
|
||||
* Formatted is handled entirely by the configured Layout, so that is not verified here.
|
||||
*/
|
||||
public void testWritesMessageWhenServiceEnabled() {
|
||||
appender.setEnabled(true);
|
||||
|
||||
when(layout.toByteArray(any())).thenReturn("{ \"some key\": \"some value\" }".getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
appender.append(mock(LogEvent.class));
|
||||
|
||||
ArgumentCaptor<IndexRequest> argument = ArgumentCaptor.forClass(IndexRequest.class);
|
||||
|
||||
verify(consumer).accept(argument.capture());
|
||||
|
||||
final IndexRequest indexRequest = argument.getValue();
|
||||
final Map<String, Object> payloadMap = indexRequest.sourceAsMap();
|
||||
|
||||
assertThat(payloadMap, hasEntry("some key", "some value"));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue