[ML] Removes direct dependency on Jackson Parser (elastic/x-pack-elasticsearch#810)
* [ML] Removes direct dependency on Jackson Parser The classes that read data from the post data action ready to write to the autodetect process had a direct dependency on Jackson’s `Parser` class. This changes makes those classes depend on XContent instead making them consistent with the way we parse requests and data across Elasticsearch and X-Pack. * Simplify json record reader This commit removes the unnecessary `AbstractJsonRecordReader` and `JsonRecordReader` interfaces/classes. These are not required as we do and should only have one implementation of reading json records. Original commit: elastic/x-pack-elasticsearch@366b8af943
This commit is contained in:
parent
7756067e5d
commit
dde0570296
|
@ -123,12 +123,12 @@ import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction;
|
|||
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
|
||||
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction;
|
||||
import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksClusterService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksNodeService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksClusterService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksNodeService;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
|
||||
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction;
|
||||
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
|
||||
|
@ -295,7 +295,7 @@ public class MachineLearning implements ActionPlugin {
|
|||
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, internalClient);
|
||||
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool,
|
||||
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
|
||||
normalizerFactory, persistentTasksService);
|
||||
normalizerFactory, persistentTasksService, xContentRegistry);
|
||||
DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, internalClient, clusterService, jobProvider,
|
||||
System::currentTimeMillis, persistentTasksService, auditor);
|
||||
InvalidLicenseEnforcer invalidLicenseEnforcer =
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.common.CheckedSupplier;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
|
||||
|
@ -53,15 +54,19 @@ public class AutodetectCommunicator implements Closeable {
|
|||
private final Consumer<Exception> handler;
|
||||
|
||||
final AtomicReference<CountDownLatch> inUse = new AtomicReference<>();
|
||||
private NamedXContentRegistry xContentRegistry;
|
||||
|
||||
public AutodetectCommunicator(long taskId, Job job, AutodetectProcess process, DataCountsReporter dataCountsReporter,
|
||||
AutoDetectResultProcessor autoDetectResultProcessor, Consumer<Exception> handler) {
|
||||
public AutodetectCommunicator(long taskId, Job job, AutodetectProcess process,
|
||||
DataCountsReporter dataCountsReporter,
|
||||
AutoDetectResultProcessor autoDetectResultProcessor, Consumer<Exception> handler,
|
||||
NamedXContentRegistry xContentRegistry) {
|
||||
this.taskId = taskId;
|
||||
this.job = job;
|
||||
this.autodetectProcess = process;
|
||||
this.dataCountsReporter = dataCountsReporter;
|
||||
this.autoDetectResultProcessor = autoDetectResultProcessor;
|
||||
this.handler = handler;
|
||||
this.xContentRegistry = xContentRegistry;
|
||||
}
|
||||
|
||||
public void writeJobInputHeader() throws IOException {
|
||||
|
@ -69,8 +74,9 @@ public class AutodetectCommunicator implements Closeable {
|
|||
}
|
||||
|
||||
private DataToProcessWriter createProcessWriter(Optional<DataDescription> dataDescription) {
|
||||
return DataToProcessWriterFactory.create(true, autodetectProcess, dataDescription.orElse(job.getDataDescription()),
|
||||
job.getAnalysisConfig(), dataCountsReporter);
|
||||
return DataToProcessWriterFactory.create(true, autodetectProcess,
|
||||
dataDescription.orElse(job.getDataDescription()), job.getAnalysisConfig(),
|
||||
dataCountsReporter, xContentRegistry);
|
||||
}
|
||||
|
||||
public DataCounts writeToJob(InputStream inputStream, DataLoadParams params) throws IOException {
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
|
@ -80,14 +81,17 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
|
||||
private final int maxAllowedRunningJobs;
|
||||
|
||||
public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager,
|
||||
JobProvider jobProvider, JobResultsPersister jobResultsPersister,
|
||||
JobDataCountsPersister jobDataCountsPersister,
|
||||
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
|
||||
PersistentTasksService persistentTasksService) {
|
||||
private NamedXContentRegistry xContentRegistry;
|
||||
|
||||
public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool,
|
||||
JobManager jobManager, JobProvider jobProvider, JobResultsPersister jobResultsPersister,
|
||||
JobDataCountsPersister jobDataCountsPersister,
|
||||
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
|
||||
PersistentTasksService persistentTasksService, NamedXContentRegistry xContentRegistry) {
|
||||
super(settings);
|
||||
this.client = client;
|
||||
this.threadPool = threadPool;
|
||||
this.xContentRegistry = xContentRegistry;
|
||||
this.maxAllowedRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings);
|
||||
this.autodetectProcessFactory = autodetectProcessFactory;
|
||||
this.normalizerFactory = normalizerFactory;
|
||||
|
@ -275,7 +279,8 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
}
|
||||
throw e;
|
||||
}
|
||||
return new AutodetectCommunicator(taskId, job, process, dataCountsReporter, processor, handler);
|
||||
return new AutodetectCommunicator(taskId, job, process, dataCountsReporter, processor,
|
||||
handler, xContentRegistry);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,90 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
abstract class AbstractJsonRecordReader implements JsonRecordReader {
|
||||
static final int PARSE_ERRORS_LIMIT = 100;
|
||||
|
||||
// NORELEASE - Remove direct dependency on Jackson
|
||||
protected final JsonParser parser;
|
||||
protected final Map<String, Integer> fieldMap;
|
||||
protected final Logger logger;
|
||||
protected int nestedLevel;
|
||||
protected long fieldCount;
|
||||
protected int errorCounter;
|
||||
|
||||
/**
|
||||
* Create a reader that parses the mapped fields from JSON.
|
||||
*
|
||||
* @param parser
|
||||
* The JSON parser
|
||||
* @param fieldMap
|
||||
* Map to field name to record array index position
|
||||
* @param logger
|
||||
* the logger
|
||||
*/
|
||||
AbstractJsonRecordReader(JsonParser parser, Map<String, Integer> fieldMap, Logger logger) {
|
||||
this.parser = Objects.requireNonNull(parser);
|
||||
this.fieldMap = Objects.requireNonNull(fieldMap);
|
||||
this.logger = Objects.requireNonNull(logger);
|
||||
}
|
||||
|
||||
protected void initArrays(String[] record, boolean[] gotFields) {
|
||||
Arrays.fill(gotFields, false);
|
||||
Arrays.fill(record, "");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns null at the EOF or the next token
|
||||
*/
|
||||
protected JsonToken tryNextTokenOrReadToEndOnError() throws IOException {
|
||||
try {
|
||||
return parser.nextToken();
|
||||
} catch (JsonParseException e) {
|
||||
logger.warn("Attempting to recover from malformed JSON data.", e);
|
||||
for (int i = 0; i <= nestedLevel; ++i) {
|
||||
readToEndOfObject();
|
||||
}
|
||||
clearNestedLevel();
|
||||
}
|
||||
|
||||
return parser.getCurrentToken();
|
||||
}
|
||||
|
||||
protected abstract void clearNestedLevel();
|
||||
|
||||
/**
|
||||
* In some cases the parser doesn't recognise the '}' of a badly formed
|
||||
* JSON document and so may skip to the end of the second document. In this
|
||||
* case we lose an extra record.
|
||||
*/
|
||||
protected void readToEndOfObject() throws IOException {
|
||||
JsonToken token = null;
|
||||
do {
|
||||
try {
|
||||
token = parser.nextToken();
|
||||
} catch (JsonParseException e) {
|
||||
++errorCounter;
|
||||
if (errorCounter >= PARSE_ERRORS_LIMIT) {
|
||||
logger.error("Failed to recover from malformed JSON data.", e);
|
||||
throw new ElasticsearchParseException("The input JSON data is malformed.");
|
||||
}
|
||||
}
|
||||
}
|
||||
while (token != JsonToken.END_OBJECT);
|
||||
}
|
||||
}
|
|
@ -5,16 +5,15 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
|
||||
|
||||
/**
|
||||
* Factory for creating the suitable writer depending on
|
||||
* whether the data format is JSON or not, and on the kind
|
||||
* of date transformation that should occur.
|
||||
* Factory for creating the suitable writer depending on whether the data format
|
||||
* is JSON or not, and on the kind of date transformation that should occur.
|
||||
*/
|
||||
public final class DataToProcessWriterFactory {
|
||||
|
||||
|
@ -23,22 +22,23 @@ public final class DataToProcessWriterFactory {
|
|||
}
|
||||
|
||||
/**
|
||||
* Constructs a {@link DataToProcessWriter} depending on
|
||||
* the data format and the time transformation.
|
||||
* Constructs a {@link DataToProcessWriter} depending on the data format and
|
||||
* the time transformation.
|
||||
*
|
||||
* @return A {@link JsonDataToProcessWriter} if the data
|
||||
* format is JSON or otherwise a {@link CsvDataToProcessWriter}
|
||||
* @return A {@link JsonDataToProcessWriter} if the data format is JSON or
|
||||
* otherwise a {@link CsvDataToProcessWriter}
|
||||
*/
|
||||
public static DataToProcessWriter create(boolean includeControlField, AutodetectProcess autodetectProcess,
|
||||
DataDescription dataDescription, AnalysisConfig analysisConfig,
|
||||
DataCountsReporter dataCountsReporter) {
|
||||
public static DataToProcessWriter create(boolean includeControlField,
|
||||
AutodetectProcess autodetectProcess, DataDescription dataDescription,
|
||||
AnalysisConfig analysisConfig, DataCountsReporter dataCountsReporter,
|
||||
NamedXContentRegistry xContentRegistry) {
|
||||
switch (dataDescription.getFormat()) {
|
||||
case JSON:
|
||||
return new JsonDataToProcessWriter(includeControlField, autodetectProcess, dataDescription, analysisConfig,
|
||||
dataCountsReporter);
|
||||
return new JsonDataToProcessWriter(includeControlField, autodetectProcess,
|
||||
dataDescription, analysisConfig, dataCountsReporter, xContentRegistry);
|
||||
case DELIMITED:
|
||||
return new CsvDataToProcessWriter(includeControlField, autodetectProcess, dataDescription, analysisConfig,
|
||||
dataCountsReporter);
|
||||
return new CsvDataToProcessWriter(includeControlField, autodetectProcess,
|
||||
dataDescription, analysisConfig, dataCountsReporter);
|
||||
default:
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
|
|
@ -5,15 +5,17 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
|
||||
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -32,10 +34,14 @@ import java.util.Map;
|
|||
class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
|
||||
|
||||
private static final Logger LOGGER = Loggers.getLogger(JsonDataToProcessWriter.class);
|
||||
private NamedXContentRegistry xContentRegistry;
|
||||
|
||||
JsonDataToProcessWriter(boolean includeControlField, AutodetectProcess autodetectProcess, DataDescription dataDescription,
|
||||
AnalysisConfig analysisConfig, DataCountsReporter dataCountsReporter) {
|
||||
super(includeControlField, autodetectProcess, dataDescription, analysisConfig, dataCountsReporter, LOGGER);
|
||||
JsonDataToProcessWriter(boolean includeControlField, AutodetectProcess autodetectProcess,
|
||||
DataDescription dataDescription, AnalysisConfig analysisConfig,
|
||||
DataCountsReporter dataCountsReporter, NamedXContentRegistry xContentRegistry) {
|
||||
super(includeControlField, autodetectProcess, dataDescription, analysisConfig,
|
||||
dataCountsReporter, LOGGER);
|
||||
this.xContentRegistry = xContentRegistry;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -49,7 +55,8 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
|
|||
public DataCounts write(InputStream inputStream) throws IOException {
|
||||
dataCountsReporter.startNewIncrementalCount();
|
||||
|
||||
try (JsonParser parser = new JsonFactory().createParser(inputStream)) {
|
||||
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON)
|
||||
.createParser(xContentRegistry, inputStream)) {
|
||||
writeJson(parser);
|
||||
|
||||
// this line can throw and will be propagated
|
||||
|
@ -59,7 +66,7 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
|
|||
return dataCountsReporter.incrementalStats();
|
||||
}
|
||||
|
||||
private void writeJson(JsonParser parser) throws IOException {
|
||||
private void writeJson(XContentParser parser) throws IOException {
|
||||
Collection<String> analysisFields = inputFields();
|
||||
|
||||
buildFieldIndexMapping(analysisFields.toArray(new String[0]));
|
||||
|
@ -71,7 +78,8 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
|
|||
// We never expect to get the control field
|
||||
boolean[] gotFields = new boolean[analysisFields.size()];
|
||||
|
||||
JsonRecordReader recordReader = new SimpleJsonRecordReader(parser, inFieldIndexes, LOGGER);
|
||||
SimpleJsonRecordReader recordReader = new SimpleJsonRecordReader(parser, inFieldIndexes,
|
||||
LOGGER);
|
||||
long inputFieldCount = recordReader.read(input, gotFields);
|
||||
while (inputFieldCount >= 0) {
|
||||
Arrays.fill(record, "");
|
||||
|
|
|
@ -1,25 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Interface for classes that read the various styles of JSON inputIndex.
|
||||
*/
|
||||
interface JsonRecordReader {
|
||||
/**
|
||||
* Read some JSON and write to the record array.
|
||||
*
|
||||
* @param record Read fields are written to this array. This array is first filled with empty
|
||||
* strings and will never contain a <code>null</code>
|
||||
* @param gotFields boolean array each element is true if that field
|
||||
* was read
|
||||
* @return The number of fields in the JSON doc or -1 if nothing was read
|
||||
* because the end of the stream was reached
|
||||
*/
|
||||
long read(String[] record, boolean[] gotFields) throws IOException;
|
||||
}
|
|
@ -5,17 +5,28 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
import java.util.Map;
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Arrays;
|
||||
import java.util.Deque;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
class SimpleJsonRecordReader extends AbstractJsonRecordReader {
|
||||
class SimpleJsonRecordReader {
|
||||
static final int PARSE_ERRORS_LIMIT = 100;
|
||||
|
||||
protected final XContentParser parser;
|
||||
protected final Map<String, Integer> fieldMap;
|
||||
protected final Logger logger;
|
||||
protected int nestedLevel;
|
||||
protected long fieldCount;
|
||||
protected int errorCounter;
|
||||
private Deque<String> nestedFields;
|
||||
private String nestedPrefix;
|
||||
|
||||
|
@ -29,8 +40,10 @@ class SimpleJsonRecordReader extends AbstractJsonRecordReader {
|
|||
* @param logger
|
||||
* logger
|
||||
*/
|
||||
SimpleJsonRecordReader(JsonParser parser, Map<String, Integer> fieldMap, Logger logger) {
|
||||
super(parser, fieldMap, logger);
|
||||
SimpleJsonRecordReader(XContentParser parser, Map<String, Integer> fieldMap, Logger logger) {
|
||||
this.parser = Objects.requireNonNull(parser);
|
||||
this.fieldMap = Objects.requireNonNull(fieldMap);
|
||||
this.logger = Objects.requireNonNull(logger);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -50,25 +63,24 @@ class SimpleJsonRecordReader extends AbstractJsonRecordReader {
|
|||
* @return The number of fields in the JSON doc or -1 if nothing was read
|
||||
* because the end of the stream was reached
|
||||
*/
|
||||
@Override
|
||||
public long read(String[] record, boolean[] gotFields) throws IOException {
|
||||
initArrays(record, gotFields);
|
||||
fieldCount = 0;
|
||||
clearNestedLevel();
|
||||
|
||||
JsonToken token = tryNextTokenOrReadToEndOnError();
|
||||
while (!(token == JsonToken.END_OBJECT && nestedLevel == 0)) {
|
||||
XContentParser.Token token = tryNextTokenOrReadToEndOnError();
|
||||
while (!(token == XContentParser.Token.END_OBJECT && nestedLevel == 0)) {
|
||||
if (token == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (token == JsonToken.END_OBJECT) {
|
||||
if (token == XContentParser.Token.END_OBJECT) {
|
||||
--nestedLevel;
|
||||
String objectFieldName = nestedFields.pop();
|
||||
|
||||
int lastIndex = nestedPrefix.length() - objectFieldName.length() - 1;
|
||||
nestedPrefix = nestedPrefix.substring(0, lastIndex);
|
||||
} else if (token == JsonToken.FIELD_NAME) {
|
||||
} else if (token == XContentParser.Token.FIELD_NAME) {
|
||||
parseFieldValuePair(record, gotFields);
|
||||
}
|
||||
|
||||
|
@ -82,7 +94,6 @@ class SimpleJsonRecordReader extends AbstractJsonRecordReader {
|
|||
return fieldCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void clearNestedLevel() {
|
||||
nestedLevel = 0;
|
||||
nestedFields = new ArrayDeque<String>();
|
||||
|
@ -90,19 +101,19 @@ class SimpleJsonRecordReader extends AbstractJsonRecordReader {
|
|||
}
|
||||
|
||||
private void parseFieldValuePair(String[] record, boolean[] gotFields) throws IOException {
|
||||
String fieldName = parser.getCurrentName();
|
||||
JsonToken token = tryNextTokenOrReadToEndOnError();
|
||||
String fieldName = parser.currentName();
|
||||
XContentParser.Token token = tryNextTokenOrReadToEndOnError();
|
||||
|
||||
if (token == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (token == JsonToken.START_OBJECT) {
|
||||
if (token == XContentParser.Token.START_OBJECT) {
|
||||
++nestedLevel;
|
||||
nestedFields.push(fieldName);
|
||||
nestedPrefix = nestedPrefix + fieldName + ".";
|
||||
} else {
|
||||
if (token == JsonToken.START_ARRAY || token.isScalarValue()) {
|
||||
if (token == XContentParser.Token.START_ARRAY || token.isValue()) {
|
||||
++fieldCount;
|
||||
|
||||
// Only do the donkey work of converting the field value to a
|
||||
|
@ -118,15 +129,15 @@ class SimpleJsonRecordReader extends AbstractJsonRecordReader {
|
|||
}
|
||||
}
|
||||
|
||||
private String parseSingleFieldValue(JsonToken token) throws IOException {
|
||||
if (token == JsonToken.START_ARRAY) {
|
||||
private String parseSingleFieldValue(XContentParser.Token token) throws IOException {
|
||||
if (token == XContentParser.Token.START_ARRAY) {
|
||||
// Convert any scalar values in the array to a comma delimited
|
||||
// string. (Arrays of more complex objects are ignored.)
|
||||
StringBuilder strBuilder = new StringBuilder();
|
||||
boolean needComma = false;
|
||||
while (token != JsonToken.END_ARRAY) {
|
||||
while (token != XContentParser.Token.END_ARRAY) {
|
||||
token = tryNextTokenOrReadToEndOnError();
|
||||
if (token.isScalarValue()) {
|
||||
if (token.isValue()) {
|
||||
if (needComma) {
|
||||
strBuilder.append(',');
|
||||
} else {
|
||||
|
@ -142,16 +153,16 @@ class SimpleJsonRecordReader extends AbstractJsonRecordReader {
|
|||
return tokenToString(token);
|
||||
}
|
||||
|
||||
private void skipSingleFieldValue(JsonToken token) throws IOException {
|
||||
private void skipSingleFieldValue(XContentParser.Token token) throws IOException {
|
||||
// Scalar values don't need any extra skip code
|
||||
if (token == JsonToken.START_ARRAY) {
|
||||
if (token == XContentParser.Token.START_ARRAY) {
|
||||
// Consume the whole array but do nothing with it
|
||||
int arrayDepth = 1;
|
||||
do {
|
||||
token = tryNextTokenOrReadToEndOnError();
|
||||
if (token == JsonToken.END_ARRAY) {
|
||||
if (token == XContentParser.Token.END_ARRAY) {
|
||||
--arrayDepth;
|
||||
} else if (token == JsonToken.START_ARRAY) {
|
||||
} else if (token == XContentParser.Token.START_ARRAY) {
|
||||
++arrayDepth;
|
||||
}
|
||||
}
|
||||
|
@ -165,10 +176,52 @@ class SimpleJsonRecordReader extends AbstractJsonRecordReader {
|
|||
* product treats them (which in turn is shaped by the fact that CSV
|
||||
* cannot distinguish empty string and null).
|
||||
*/
|
||||
private String tokenToString(JsonToken token) throws IOException {
|
||||
if (token == null || token == JsonToken.VALUE_NULL) {
|
||||
private String tokenToString(XContentParser.Token token) throws IOException {
|
||||
if (token == null || token == XContentParser.Token.VALUE_NULL) {
|
||||
return "";
|
||||
}
|
||||
return parser.getText();
|
||||
return parser.text();
|
||||
}
|
||||
|
||||
protected void initArrays(String[] record, boolean[] gotFields) {
|
||||
Arrays.fill(gotFields, false);
|
||||
Arrays.fill(record, "");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns null at the EOF or the next token
|
||||
*/
|
||||
protected XContentParser.Token tryNextTokenOrReadToEndOnError() throws IOException {
|
||||
try {
|
||||
return parser.nextToken();
|
||||
} catch (JsonParseException e) {
|
||||
logger.warn("Attempting to recover from malformed JSON data.", e);
|
||||
for (int i = 0; i <= nestedLevel; ++i) {
|
||||
readToEndOfObject();
|
||||
}
|
||||
clearNestedLevel();
|
||||
}
|
||||
|
||||
return parser.currentToken();
|
||||
}
|
||||
|
||||
/**
|
||||
* In some cases the parser doesn't recognise the '}' of a badly formed JSON
|
||||
* document and so may skip to the end of the second document. In this case
|
||||
* we lose an extra record.
|
||||
*/
|
||||
protected void readToEndOfObject() throws IOException {
|
||||
XContentParser.Token token = null;
|
||||
do {
|
||||
try {
|
||||
token = parser.nextToken();
|
||||
} catch (JsonParseException e) {
|
||||
++errorCounter;
|
||||
if (errorCounter >= PARSE_ERRORS_LIMIT) {
|
||||
logger.error("Failed to recover from malformed JSON data.", e);
|
||||
throw new ElasticsearchParseException("The input JSON data is malformed.");
|
||||
}
|
||||
}
|
||||
} while (token != XContentParser.Token.END_OBJECT);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
|
@ -148,8 +149,9 @@ public class AutodetectCommunicatorTests extends ESTestCase {
|
|||
return null;
|
||||
}).when(executorService).execute(any(Runnable.class));
|
||||
DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class);
|
||||
return new AutodetectCommunicator(0L, createJobDetails(), autodetectProcess, dataCountsReporter, autoDetectResultProcessor,
|
||||
e -> {});
|
||||
return new AutodetectCommunicator(0L, createJobDetails(), autodetectProcess,
|
||||
dataCountsReporter, autoDetectResultProcessor, e -> {
|
||||
}, new NamedXContentRegistry(Collections.emptyList()));
|
||||
}
|
||||
|
||||
public void testWriteToJobInUse() throws IOException {
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.common.CheckedConsumer;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
|
@ -161,7 +162,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
Settings.Builder settings = Settings.builder();
|
||||
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3);
|
||||
AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider,
|
||||
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, persistentTasksService));
|
||||
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
|
||||
normalizerFactory, persistentTasksService,
|
||||
new NamedXContentRegistry(Collections.emptyList())));
|
||||
|
||||
DataCounts dataCounts = new DataCounts("foo");
|
||||
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder("foo").build();
|
||||
|
@ -323,7 +326,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
|
||||
AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess;
|
||||
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
|
||||
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, persistentTasksService);
|
||||
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
|
||||
normalizerFactory, persistentTasksService,
|
||||
new NamedXContentRegistry(Collections.emptyList()));
|
||||
|
||||
expectThrows(EsRejectedExecutionException.class,
|
||||
() -> manager.create("my_id", 1L, dataCounts, modelSnapshot, quantiles, filters, false, e -> {}));
|
||||
|
@ -340,8 +345,10 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
PersistentTasksService persistentTasksService) {
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class);
|
||||
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
|
||||
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, persistentTasksService);
|
||||
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client,
|
||||
threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister,
|
||||
autodetectProcessFactory, normalizerFactory, persistentTasksService,
|
||||
new NamedXContentRegistry(Collections.emptyList()));
|
||||
manager = spy(manager);
|
||||
doReturn(communicator).when(manager)
|
||||
.create(any(), anyLong(), eq(dataCounts), eq(modelSnapshot), eq(quantiles), eq(filters), anyBoolean(), any());
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
|
||||
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfigTests;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
|
@ -12,6 +13,8 @@ import org.elasticsearch.xpack.ml.job.config.DataDescription.DataFormat;
|
|||
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class DataToProcessWriterFactoryTests extends ESTestCase {
|
||||
|
@ -30,7 +33,8 @@ public class DataToProcessWriterFactoryTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private static DataToProcessWriter createWriter(DataDescription dataDescription) {
|
||||
return DataToProcessWriterFactory.create(true, mock(AutodetectProcess.class), dataDescription,
|
||||
AnalysisConfigTests.createRandomized().build(), mock(DataCountsReporter.class));
|
||||
return DataToProcessWriterFactory.create(true, mock(AutodetectProcess.class),
|
||||
dataDescription, AnalysisConfigTests.createRandomized().build(),
|
||||
mock(DataCountsReporter.class), new NamedXContentRegistry(Collections.emptyList()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
|
|||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||
|
@ -25,6 +26,7 @@ import java.io.InputStream;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
|
@ -283,7 +285,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
|
|||
|
||||
private JsonDataToProcessWriter createWriter() {
|
||||
return new JsonDataToProcessWriter(true, autodetectProcess, dataDescription.build(), analysisConfig,
|
||||
dataCountsReporter);
|
||||
dataCountsReporter, new NamedXContentRegistry(Collections.emptyList()));
|
||||
}
|
||||
|
||||
private void assertWrittenRecordsEqualTo(List<String[]> expectedRecords) {
|
||||
|
|
|
@ -5,11 +5,14 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
|
||||
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
|
||||
|
@ -18,6 +21,7 @@ import java.io.ByteArrayInputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
@ -27,7 +31,7 @@ import static org.mockito.Mockito.mock;
|
|||
public class SimpleJsonRecordReaderTests extends ESTestCase {
|
||||
public void testRead() throws JsonParseException, IOException {
|
||||
String data = "{\"a\":10, \"b\":20, \"c\":30}\n{\"b\":21, \"a\":11, \"c\":31}\n";
|
||||
JsonParser parser = createParser(data);
|
||||
XContentParser parser = createParser(data);
|
||||
Map<String, Integer> fieldMap = createFieldMap();
|
||||
|
||||
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
|
||||
|
@ -51,7 +55,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
|
|||
|
||||
public void testRead_GivenNestedField() throws JsonParseException, IOException {
|
||||
String data = "{\"a\":10, \"b\":20, \"c\":{\"d\":30, \"e\":40}}";
|
||||
JsonParser parser = createParser(data);
|
||||
XContentParser parser = createParser(data);
|
||||
Map<String, Integer> fieldMap = new HashMap<>();
|
||||
fieldMap.put("a", 0);
|
||||
fieldMap.put("b", 1);
|
||||
|
@ -73,7 +77,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
|
|||
|
||||
public void testRead_GivenSingleValueArrays() throws JsonParseException, IOException {
|
||||
String data = "{\"a\":[10], \"b\":20, \"c\":{\"d\":30, \"e\":[40]}}";
|
||||
JsonParser parser = createParser(data);
|
||||
XContentParser parser = createParser(data);
|
||||
Map<String, Integer> fieldMap = new HashMap<>();
|
||||
fieldMap.put("a", 0);
|
||||
fieldMap.put("b", 1);
|
||||
|
@ -95,7 +99,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
|
|||
|
||||
public void testRead_GivenMultiValueArrays() throws JsonParseException, IOException {
|
||||
String data = "{\"a\":[10, 11], \"b\":20, \"c\":{\"d\":30, \"e\":[40, 50]}, \"f\":[\"a\", \"a\", \"a\", \"a\"], \"g\":20}";
|
||||
JsonParser parser = createParser(data);
|
||||
XContentParser parser = createParser(data);
|
||||
Map<String, Integer> fieldMap = new HashMap<>();
|
||||
fieldMap.put("a", 0);
|
||||
fieldMap.put("g", 1);
|
||||
|
@ -123,7 +127,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
|
|||
public void testRead_RecoverFromBadJson() throws JsonParseException, IOException {
|
||||
// no opening '{'
|
||||
String data = "\"a\":10, \"b\":20, \"c\":30}\n{\"b\":21, \"a\":11, \"c\":31}\n{\"c\":32, \"b\":22, \"a\":12}";
|
||||
JsonParser parser = createParser(data);
|
||||
XContentParser parser = createParser(data);
|
||||
Map<String, Integer> fieldMap = createFieldMap();
|
||||
|
||||
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
|
||||
|
@ -145,7 +149,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
|
|||
// nested object 'd' is missing a ','
|
||||
String data = "{\"a\":10, \"b\":20, \"c\":30}\n" +
|
||||
"{\"b\":21, \"d\" : {\"ee\": 1 \"ff\":0}, \"a\":11, \"c\":31}";
|
||||
JsonParser parser = createParser(data);
|
||||
XContentParser parser = createParser(data);
|
||||
Map<String, Integer> fieldMap = createFieldMap();
|
||||
|
||||
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
|
||||
|
@ -173,7 +177,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
|
|||
builder.append(String.format(Locale.ROOT, format, i));
|
||||
}
|
||||
|
||||
JsonParser parser = createParser(builder.toString());
|
||||
XContentParser parser = createParser(builder.toString());
|
||||
Map<String, Integer> fieldMap = createFieldMap();
|
||||
|
||||
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
|
||||
|
@ -197,7 +201,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
|
|||
+ "\n{\"b\":21, \"a\":11, \"c\":31}"
|
||||
+ "\n{\"c\":32, \"b\":22, \"a\":12}\n";
|
||||
|
||||
JsonParser parser = createParser(data);
|
||||
XContentParser parser = createParser(data);
|
||||
Map<String, Integer> fieldMap = createFieldMap();
|
||||
|
||||
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
|
||||
|
@ -210,10 +214,11 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
|
|||
assertEquals(3, reader.read(record, gotFields));
|
||||
}
|
||||
|
||||
private JsonParser createParser(String input) throws JsonParseException, IOException {
|
||||
private XContentParser createParser(String input) throws JsonParseException, IOException {
|
||||
ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8));
|
||||
InputStream inputStream2 = new CountingInputStream(inputStream, mock(DataCountsReporter.class));
|
||||
return new JsonFactory().createParser(inputStream2);
|
||||
return XContentFactory.xContent(XContentType.JSON)
|
||||
.createParser(new NamedXContentRegistry(Collections.emptyList()), inputStream2);
|
||||
}
|
||||
|
||||
private Map<String, Integer> createFieldMap() {
|
||||
|
|
Loading…
Reference in New Issue