[ML] Adding audits when deprecation warnings occur with datafeed start (#36233)

* [ML] Adding audits when deprecation warnings occur with datafeed start

* adjusting parameters for log format call
This commit is contained in:
Benjamin Trent 2018-12-06 15:58:37 -06:00 committed by GitHub
parent d1184cfb68
commit 3e04a90e99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 182 additions and 17 deletions

View File

@ -10,6 +10,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
@ -46,7 +47,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
/**
* Datafeed configuration options. Describes where to proactively pull input
@ -65,9 +65,10 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private static final int TWENTY_MINS_SECONDS = 20 * SECONDS_IN_MINUTE;
private static final int HALF_DAY_SECONDS = 12 * 60 * SECONDS_IN_MINUTE;
static final XContentObjectTransformer<QueryBuilder> QUERY_TRANSFORMER = XContentObjectTransformer.queryBuilderTransformer();
private static final BiFunction<Map<String, Object>, String, QueryBuilder> lazyQueryParser = (objectMap, id) -> {
static final TriFunction<Map<String, Object>, String, List<String>, QueryBuilder> lazyQueryParser =
(objectMap, id, warnings) -> {
try {
return QUERY_TRANSFORMER.fromMap(objectMap);
return QUERY_TRANSFORMER.fromMap(objectMap, warnings);
} catch (IOException | XContentParseException exception) {
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
if (exception.getCause() instanceof IllegalArgumentException) {
@ -85,9 +86,10 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
};
static final XContentObjectTransformer<AggregatorFactories.Builder> AGG_TRANSFORMER = XContentObjectTransformer.aggregatorTransformer();
private static final BiFunction<Map<String, Object>, String, AggregatorFactories.Builder> lazyAggParser = (objectMap, id) -> {
static final TriFunction<Map<String, Object>, String, List<String>, AggregatorFactories.Builder> lazyAggParser =
(objectMap, id, warnings) -> {
try {
return AGG_TRANSFORMER.fromMap(objectMap);
return AGG_TRANSFORMER.fromMap(objectMap, warnings);
} catch (IOException | XContentParseException exception) {
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
if (exception.getCause() instanceof IllegalArgumentException) {
@ -237,8 +239,8 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.chunkingConfig = chunkingConfig;
this.headers = Collections.unmodifiableMap(headers);
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.querySupplier = new CachedSupplier<>(() -> lazyQueryParser.apply(query, id));
this.aggSupplier = new CachedSupplier<>(() -> lazyAggParser.apply(aggregations, id));
this.querySupplier = new CachedSupplier<>(() -> lazyQueryParser.apply(query, id, new ArrayList<>()));
this.aggSupplier = new CachedSupplier<>(() -> lazyAggParser.apply(aggregations, id, new ArrayList<>()));
}
public DatafeedConfig(StreamInput in) throws IOException {
@ -284,8 +286,8 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
} else {
delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig();
}
this.querySupplier = new CachedSupplier<>(() -> lazyQueryParser.apply(query, id));
this.aggSupplier = new CachedSupplier<>(() -> lazyAggParser.apply(aggregations, id));
this.querySupplier = new CachedSupplier<>(() -> lazyQueryParser.apply(query, id, new ArrayList<>()));
this.aggSupplier = new CachedSupplier<>(() -> lazyAggParser.apply(aggregations, id, new ArrayList<>()));
}
public String getId() {
@ -320,6 +322,20 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
return querySupplier.get();
}
/**
* Calls the lazy parser and returns any gathered deprecations
* @return The deprecations from parsing the query
*/
public List<String> getQueryDeprecations() {
return getQueryDeprecations(lazyQueryParser);
}
List<String> getQueryDeprecations(TriFunction<Map<String, Object>, String, List<String>, QueryBuilder> parser) {
List<String> deprecations = new ArrayList<>();
parser.apply(query, id, deprecations);
return deprecations;
}
public Map<String, Object> getQuery() {
return query;
}
@ -328,6 +344,20 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
return aggSupplier.get();
}
/**
* Calls the lazy parser and returns any gathered deprecations
* @return The deprecations from parsing the aggregations
*/
public List<String> getAggDeprecations() {
return getAggDeprecations(lazyAggParser);
}
List<String> getAggDeprecations(TriFunction<Map<String, Object>, String, List<String>, AggregatorFactories.Builder> parser) {
List<String> deprecations = new ArrayList<>();
parser.apply(aggregations, id, deprecations);
return deprecations;
}
public Map<String, Object> getAggregations() {
return aggregations;
}
@ -758,7 +788,8 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
if (aggregations == null) {
chunkingConfig = ChunkingConfig.newAuto();
} else {
long histogramIntervalMillis = ExtractorUtils.getHistogramIntervalMillis(lazyAggParser.apply(aggregations, id));
long histogramIntervalMillis =
ExtractorUtils.getHistogramIntervalMillis(lazyAggParser.apply(aggregations, id, new ArrayList<>()));
chunkingConfig = ChunkingConfig.newManual(TimeValue.timeValueMillis(
DEFAULT_AGGREGATION_CHUNKING_BUCKETS * histogramIntervalMillis));
}

View File

@ -29,16 +29,14 @@ public class LoggingDeprecationAccumulationHandler implements DeprecationHandler
public void usedDeprecatedName(String usedName, String modernName) {
LoggingDeprecationHandler.INSTANCE.usedDeprecatedName(usedName, modernName);
deprecations.add(LoggerMessageFormat.format("Deprecated field [{}] used, expected [{}] instead",
usedName,
modernName));
new Object[] {usedName, modernName}));
}
@Override
public void usedDeprecatedField(String usedName, String replacedWith) {
LoggingDeprecationHandler.INSTANCE.usedDeprecatedField(usedName, replacedWith);
deprecations.add(LoggerMessageFormat.format("Deprecated field [{}] used, replaced by [{}]",
usedName,
replacedWith));
new Object[] {usedName, replacedWith}));
}
/**

View File

@ -22,7 +22,9 @@ import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
@ -61,7 +63,27 @@ public class XContentObjectTransformer<T extends ToXContentObject> {
this.registry = registry;
}
/**
* Parses the map into the type T with the previously supplied parserFunction
* All deprecation warnings are ignored
* @param stringObjectMap The Map to parse into the Object
* @return parsed object T
* @throws IOException When there is an unforeseen parsing issue
*/
public T fromMap(Map<String, Object> stringObjectMap) throws IOException {
return fromMap(stringObjectMap, new ArrayList<>());
}
/**
* Parses the map into the type T with the previously supplied parserFunction
* All deprecation warnings are added to the passed deprecationWarnings list.
*
* @param stringObjectMap The Map to parse into the Object
* @param deprecationWarnings The list to which to add all deprecation warnings
* @return parsed object T
* @throws IOException When there is an unforeseen parsing issue
*/
public T fromMap(Map<String, Object> stringObjectMap, List<String> deprecationWarnings) throws IOException {
if (stringObjectMap == null) {
return null;
}
@ -72,8 +94,9 @@ public class XContentObjectTransformer<T extends ToXContentObject> {
.createParser(registry,
deprecationLogger,
BytesReference.bytes(xContentBuilder).streamInput())) {
//TODO do something with the accumulated deprecation warnings
return parserFunction.apply(parser);
T retVal = parserFunction.apply(parser);
deprecationWarnings.addAll(deprecationLogger.getDeprecations());
return retVal;
}
}

View File

@ -57,9 +57,12 @@ import java.util.TimeZone;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedConfig> {
@ -577,6 +580,34 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(12)));
}
public void testGetAggDeprecations() {
DatafeedConfig datafeed = createDatafeedWithDateHistogram("1h");
String deprecationWarning = "Warning";
List<String> deprecations = datafeed.getAggDeprecations((map, id, deprecationlist) -> {
deprecationlist.add(deprecationWarning);
return new AggregatorFactories.Builder().addAggregator(new MaxAggregationBuilder("field").field("field"));
});
assertThat(deprecations, hasItem(deprecationWarning));
DatafeedConfig spiedConfig = spy(datafeed);
spiedConfig.getAggDeprecations();
verify(spiedConfig).getAggDeprecations(DatafeedConfig.lazyAggParser);
}
public void testGetQueryDeprecations() {
DatafeedConfig datafeed = createDatafeedWithDateHistogram("1h");
String deprecationWarning = "Warning";
List<String> deprecations = datafeed.getQueryDeprecations((map, id, deprecationlist) -> {
deprecationlist.add(deprecationWarning);
return new BoolQueryBuilder();
});
assertThat(deprecations, hasItem(deprecationWarning));
DatafeedConfig spiedConfig = spy(datafeed);
spiedConfig.getQueryDeprecations();
verify(spiedConfig).getQueryDeprecations(DatafeedConfig.lazyQueryParser);
}
public void testSerializationOfComplexAggs() throws IOException {
MaxAggregationBuilder maxTime = AggregationBuilders.max("timestamp").field("timestamp");
AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("bytes_in_avg").field("system.network.in.bytes");

View File

@ -7,11 +7,13 @@ package org.elasticsearch.xpack.core.ml.utils;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
@ -20,14 +22,18 @@ import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
public class XContentObjectTransformerTests extends ESTestCase {
@ -104,6 +110,26 @@ public class XContentObjectTransformerTests extends ESTestCase {
queryBuilderTransformer.toMap(queryBuilder));
}
public void testDeprecationWarnings() throws IOException {
XContentObjectTransformer<QueryBuilder> queryBuilderTransformer = new XContentObjectTransformer<>(NamedXContentRegistry.EMPTY,
(p)-> {
p.getDeprecationHandler().usedDeprecatedField("oldField", "newField");
p.getDeprecationHandler().usedDeprecatedName("oldName", "modernName");
return new BoolQueryBuilder();
});
List<String> deprecations = new ArrayList<>();
queryBuilderTransformer.fromMap(Collections.singletonMap("bool", "match"), deprecations);
assertThat(deprecations, hasSize(2));
assertThat(deprecations, hasItem("Deprecated field [oldField] used, replaced by [newField]"));
assertThat(deprecations, hasItem("Deprecated field [oldName] used, expected [modernName] instead"));
}
@Override
protected boolean enableWarningsCheck() {
return false;
}
private void assertXContentAreEqual(ToXContentObject object, Map<String, Object> map) throws IOException {
XContentType xContentType = XContentType.JSON;
BytesReference objectReference = XContentHelper.toXContent(object, xContentType, EMPTY_PARAMS, false);

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.common.Strings;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
@ -47,7 +48,9 @@ import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -66,18 +69,20 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
private final Client client;
private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService;
private final Auditor auditor;
@Inject
public TransportStartDatafeedAction(TransportService transportService, ThreadPool threadPool,
ClusterService clusterService, XPackLicenseState licenseState,
PersistentTasksService persistentTasksService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Client client) {
Client client, Auditor auditor) {
super(StartDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
StartDatafeedAction.Request::new);
this.licenseState = licenseState;
this.persistentTasksService = persistentTasksService;
this.client = client;
this.auditor = auditor;
}
static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) {
@ -98,6 +103,19 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
}
}
//Get the deprecation warnings from the parsed query and aggs to audit
static void auditDeprecations(DatafeedConfig datafeed, Job job, Auditor auditor) {
List<String> deprecationWarnings = new ArrayList<>();
deprecationWarnings.addAll(datafeed.getAggDeprecations());
deprecationWarnings.addAll(datafeed.getQueryDeprecations());
if (deprecationWarnings.isEmpty() == false) {
String msg = "datafeed [" + datafeed.getId() +"] configuration has deprecations. [" +
Strings.collectionToDelimitedString(deprecationWarnings, ", ") + "]";
auditor.warning(job.getId(), msg);
}
}
@Override
protected String executor() {
// This api doesn't do heavy or blocking operations (just delegates PersistentTasksService),
@ -142,6 +160,8 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
DatafeedConfig datafeed = mlMetadata.getDatafeed(params.getDatafeedId());
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
auditDeprecations(datafeed, job, auditor);
if (RemoteClusterLicenseChecker.containsRemoteIndex(datafeed.getIndices())) {
final RemoteClusterLicenseChecker remoteClusterLicenseChecker =
new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode);

View File

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.util.Collections;
import java.util.Date;
@ -25,6 +26,12 @@ import java.util.Date;
import static org.elasticsearch.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT;
import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
public class TransportStartDatafeedActionTests extends ESTestCase {
@ -85,6 +92,35 @@ public class TransportStartDatafeedActionTests extends ESTestCase {
TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks);
}
public void testDeprecationsLogged() {
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
DatafeedConfig config = spy(datafeedConfig.build());
doReturn(Collections.singletonList("Deprecated Agg")).when(config).getAggDeprecations();
doReturn(Collections.singletonList("Deprecated Query")).when(config).getQueryDeprecations();
Auditor auditor = mock(Auditor.class);
TransportStartDatafeedAction.auditDeprecations(config, job1, auditor);
verify(auditor).warning(job1.getId(),
"datafeed [start-data-feed-test] configuration has deprecations. [Deprecated Agg, Deprecated Query]");
}
public void testNoDeprecationsLogged() {
Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("start-data-feed-test", job1.getId());
DatafeedConfig config = spy(datafeedConfig.build());
doReturn(Collections.emptyList()).when(config).getAggDeprecations();
doReturn(Collections.emptyList()).when(config).getQueryDeprecations();
Auditor auditor = mock(Auditor.class);
TransportStartDatafeedAction.auditDeprecations(config, job1, auditor);
verify(auditor, never()).warning(any(), any());
}
public static TransportStartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action,
TaskId parentTaskId,
StartDatafeedAction.DatafeedParams params,