diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java index 3e875e56b03..3dc28f54302 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.StatusToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -135,6 +136,7 @@ public class PostDataAction extends Action Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, job.getId()), () -> { if (params.isResettingBuckets()) { autodetectProcess.writeResetBucketsControlMessage(params); @@ -87,7 +89,7 @@ public class AutodetectCommunicator implements Closeable { CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter); DataToProcessWriter autoDetectWriter = createProcessWriter(params.getDataDescription()); - DataCounts results = autoDetectWriter.write(countingStream); + DataCounts results = autoDetectWriter.write(countingStream, xContentType); autoDetectWriter.flush(); return results; }, false); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index e88e3f14305..c150a646985 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; @@ -135,16 +136,18 @@ public class AutodetectProcessManager extends AbstractComponent { * * @param jobId the jobId * @param input Data input stream + * @param xContentType the {@link XContentType} of the input * @param params Data processing parameters * @return Count of records, fields, bytes, etc written */ - public DataCounts processData(String jobId, InputStream input, DataLoadParams params) { + public DataCounts processData(String jobId, InputStream input, XContentType xContentType, + DataLoadParams params) { AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { throw new IllegalStateException("[" + jobId + "] Cannot process data: no active autodetect process for job"); } try { - return communicator.writeToJob(input, params); + return communicator.writeToJob(input, xContentType, params); // TODO check for errors from autodetect } catch (IOException e) { String msg = String.format(Locale.ROOT, "Exception writing to process for job %s", jobId); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java index e39e439cee3..ff98d42da4b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java @@ -76,11 +76,11 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter } /** - * Set up the field index mappings. - * This must be called before {@linkplain DataToProcessWriter#write(java.io.InputStream)}. + * Set up the field index mappings. This must be called before + * {@linkplain DataToProcessWriter#write(java.io.InputStream, org.elasticsearch.common.xcontent.XContentType)}. *

- * Finds the required input indexes in the header - * and sets the mappings to the corresponding output indexes. + * Finds the required input indexes in the header and sets the + * mappings to the corresponding output indexes. */ void buildFieldIndexMapping(String[] header) throws IOException { Collection inputFields = inputFields(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriter.java index 13957d70d5b..651f8fddd67 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriter.java @@ -7,11 +7,12 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; -import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.supercsv.io.CsvListReader; import org.supercsv.prefs.CsvPreference; @@ -65,7 +66,7 @@ class CsvDataToProcessWriter extends AbstractDataToProcessWriter { * header a exception is thrown */ @Override - public DataCounts write(InputStream inputStream) throws IOException { + public DataCounts write(InputStream inputStream, XContentType xContentType) throws IOException { CsvPreference csvPref = new CsvPreference.Builder( dataDescription.getQuoteCharacter(), dataDescription.getFieldDelimiter(), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriter.java index a09be8e1015..9bc7c72045c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriter.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import java.io.IOException; @@ -32,7 +33,7 @@ public interface DataToProcessWriter { * * @return Counts of the records processed, bytes read etc */ - DataCounts write(InputStream inputStream) throws IOException; + DataCounts write(InputStream inputStream, XContentType xContentType) throws IOException; /** * Flush the outputstream diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactory.java index f0dd49aa2d1..9b18a838545 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactory.java @@ -33,7 +33,7 @@ public final class DataToProcessWriterFactory { AnalysisConfig analysisConfig, DataCountsReporter dataCountsReporter, NamedXContentRegistry xContentRegistry) { switch (dataDescription.getFormat()) { - case JSON: + case XCONTENT: return new JsonDataToProcessWriter(includeControlField, autodetectProcess, dataDescription, analysisConfig, dataCountsReporter, xContentRegistry); case DELIMITED: diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java index cfb0fa27169..62ef533de5d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java @@ -52,10 +52,10 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter { * timeField is missing from the JOSN inputIndex an exception is thrown */ @Override - public DataCounts write(InputStream inputStream) throws IOException { + public DataCounts write(InputStream inputStream, XContentType xContentType) throws IOException { dataCountsReporter.startNewIncrementalCount(); - try (XContentParser parser = XContentFactory.xContent(XContentType.JSON) + try (XContentParser parser = XContentFactory.xContent(xContentType) .createParser(xContentRegistry, inputStream)) { writeJson(parser); @@ -78,7 +78,7 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter { // We never expect to get the control field boolean[] gotFields = new boolean[analysisFields.size()]; - SimpleJsonRecordReader recordReader = new SimpleJsonRecordReader(parser, inFieldIndexes, + XContentRecordReader recordReader = new XContentRecordReader(parser, inFieldIndexes, LOGGER); long inputFieldCount = recordReader.read(input, gotFields); while (inputFieldCount >= 0) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/SimpleJsonRecordReader.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/XContentRecordReader.java similarity index 90% rename from plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/SimpleJsonRecordReader.java rename to plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/XContentRecordReader.java index ec7e92b7f37..3613bdb0648 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/SimpleJsonRecordReader.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/XContentRecordReader.java @@ -18,7 +18,7 @@ import java.util.Deque; import java.util.Map; import java.util.Objects; -class SimpleJsonRecordReader { +class XContentRecordReader { static final int PARSE_ERRORS_LIMIT = 100; protected final XContentParser parser; @@ -40,28 +40,30 @@ class SimpleJsonRecordReader { * @param logger * logger */ - SimpleJsonRecordReader(XContentParser parser, Map fieldMap, Logger logger) { + XContentRecordReader(XContentParser parser, Map fieldMap, Logger logger) { this.parser = Objects.requireNonNull(parser); this.fieldMap = Objects.requireNonNull(fieldMap); this.logger = Objects.requireNonNull(logger); } /** - * Read the JSON object and write to the record array. - * Nested objects are flattened with the field names separated by - * a '.'. - * e.g. for a record with a nested 'tags' object: - * "{"name":"my.test.metric1","tags":{"tag1":"blah","tag2":"boo"},"time":1350824400,"value":12345.678}" - * use 'tags.tag1' to reference the tag1 field in the nested object + * Read the JSON object and write to the record array. Nested objects are + * flattened with the field names separated by a '.'. e.g. for a record with + * a nested 'tags' object: + * "{"name":"my.test.metric1","tags":{"tag1":"blah","tag2":"boo"}, + * "time":1350824400,"value":12345.678}" use 'tags.tag1' to reference the + * tag1 field in the nested object *

* Array fields in the JSON are ignored * - * @param record Read fields are written to this array. This array is first filled with empty - * strings and will never contain a null - * @param gotFields boolean array each element is true if that field - * was read + * @param record + * Read fields are written to this array. This array is first + * filled with empty strings and will never contain a + * null + * @param gotFields + * boolean array each element is true if that field was read * @return The number of fields in the JSON doc or -1 if nothing was read - * because the end of the stream was reached + * because the end of the stream was reached */ public long read(String[] record, boolean[] gotFields) throws IOException { initArrays(record, gotFields); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestPostDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestPostDataAction.java index d079cca978b..ba26977da7d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestPostDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestPostDataAction.java @@ -33,7 +33,7 @@ public class RestPostDataAction extends BaseRestHandler { PostDataAction.Request request = new PostDataAction.Request(restRequest.param(Job.ID.getPreferredName())); request.setResetStart(restRequest.param(PostDataAction.Request.RESET_START.getPreferredName(), DEFAULT_RESET_START)); request.setResetEnd(restRequest.param(PostDataAction.Request.RESET_END.getPreferredName(), DEFAULT_RESET_END)); - request.setContent(restRequest.content()); + request.setContent(restRequest.content(), restRequest.getXContentType()); return channel -> client.execute(PostDataAction.INSTANCE, request, new RestStatusToXContentListener<>(channel)); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PostDataActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PostDataActionRequestTests.java index 0f4544214a8..aa58a04eb41 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PostDataActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PostDataActionRequestTests.java @@ -5,6 +5,10 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.DataDescription.DataFormat; import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase; public class PostDataActionRequestTests extends AbstractStreamableTestCase { @@ -17,6 +21,14 @@ public class PostDataActionRequestTests extends AbstractStreamableTestCase flushJobRequests; private long currentTime; + private XContentType xContentType; @Before @SuppressWarnings("unchecked") @@ -57,19 +60,22 @@ public class DatafeedJobTests extends ESTestCase { when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor); client = mock(Client.class); dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.JSON); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); ActionFuture jobDataFuture = mock(ActionFuture.class); flushJobFuture = mock(ActionFuture.class); currentTime = 0; + xContentType = XContentType.JSON; when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); - InputStream inputStream = new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8)); + byte[] contentBytes = "content".getBytes(StandardCharsets.UTF_8); + InputStream inputStream = new ByteArrayInputStream(contentBytes); when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0), new Date(0), new Date(0), new Date(0)); PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id"); expectedRequest.setDataDescription(dataDescription.build()); + expectedRequest.setContent(new BytesArray(contentBytes), xContentType); when(client.execute(same(PostDataAction.INSTANCE), eq(expectedRequest))).thenReturn(jobDataFuture); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 056961f1814..98ea012190b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; @@ -325,7 +326,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { postDataRequest.setContent(new BytesArray( "{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" + "{\"airline\":\"JZA\",\"responsetime\":\"990.4628\",\"sourcetype\":\"farequote\",\"time\":\"1403481700\"}" - )); + ), XContentType.JSON); PostDataAction.Response response = client().execute(PostDataAction.INSTANCE, postDataRequest).actionGet(); assertEquals(2, response.getDataCounts().getProcessedRecordCount()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java index 27ceb2c7681..4b4d8ed5496 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java @@ -488,7 +488,8 @@ public class DatafeedJobIT extends ESRestTestCase { String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n" + " \"analysis_config\" : {\n" + " \"bucket_span\":\"1h\",\n" + " \"detectors\" :[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n" - + " },\n" + " \"data_description\" : {\n" + " \"format\":\"JSON\",\n" + + " },\n" + " \"data_description\" : {\n" + + " \"format\":\"xcontent\",\n" + " \"time_field\":\"time stamp\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n" + "}"; return client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + id, diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/DataDescriptionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/DataDescriptionTests.java index 077ea5da903..2337bd82fe6 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/DataDescriptionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/DataDescriptionTests.java @@ -25,7 +25,7 @@ public class DataDescriptionTests extends AbstractSerializingTestCase communicator.writeToJob(in, mock(DataLoadParams.class))); + () -> communicator.writeToJob(in, xContentType, mock(DataLoadParams.class))); communicator.inUse.set(null); - communicator.writeToJob(in, new DataLoadParams(TimeRange.builder().build(), Optional.empty())); + communicator.writeToJob(in, xContentType, + new DataLoadParams(TimeRange.builder().build(), Optional.empty())); } public void testFlushInUse() throws IOException { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 20322a34745..db74717c24f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.job.JobManager; @@ -67,9 +68,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; /** - * Calling the {@link AutodetectProcessManager#processData(String, InputStream, DataLoadParams)} - * method causes an AutodetectCommunicator to be created on demand. Most of these tests have to - * do that before they can assert other things + * Calling the + * {@link AutodetectProcessManager#processData(String, InputStream, XContentType, DataLoadParams)} + * method causes an AutodetectCommunicator to be created on demand. Most of + * these tests have to do that before they can assert other things */ public class AutodetectProcessManagerTests extends ESTestCase { @@ -203,7 +205,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty()); manager.openJob("foo", 1L, false, e -> {}); - manager.processData("foo", createInputStream(""), params); + manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()), + params); assertEquals(1, manager.numberOfOpenJobs()); } @@ -213,11 +216,13 @@ public class AutodetectProcessManagerTests extends ESTestCase { DataLoadParams params = mock(DataLoadParams.class); InputStream inputStream = createInputStream(""); - doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream, params); + XContentType xContentType = randomFrom(XContentType.values()); + doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream, + xContentType, params); manager.openJob("foo", 1L, false, e -> {}); ESTestCase.expectThrows(ElasticsearchException.class, - () -> manager.processData("foo", inputStream, params)); + () -> manager.processData("foo", inputStream, xContentType, params)); } public void testCloseJob() { @@ -226,7 +231,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { assertEquals(0, manager.numberOfOpenJobs()); manager.openJob("foo", 1L, false, e -> {}); - manager.processData("foo", createInputStream(""), mock(DataLoadParams.class)); + manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()), + mock(DataLoadParams.class)); // job is created assertEquals(1, manager.numberOfOpenJobs()); @@ -237,12 +243,13 @@ public class AutodetectProcessManagerTests extends ESTestCase { public void testBucketResetMessageIsSent() throws IOException { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectProcessManager manager = createManager(communicator); + XContentType xContentType = randomFrom(XContentType.values()); DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), Optional.empty()); InputStream inputStream = createInputStream(""); manager.openJob("foo", 1L, false, e -> {}); - manager.processData("foo", inputStream, params); - verify(communicator).writeToJob(inputStream, params); + manager.processData("foo", inputStream, xContentType, params); + verify(communicator).writeToJob(inputStream, xContentType, params); } public void testFlush() throws IOException { @@ -251,7 +258,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { InputStream inputStream = createInputStream(""); manager.openJob("foo", 1L, false, e -> {}); - manager.processData("foo", inputStream, mock(DataLoadParams.class)); + manager.processData("foo", inputStream, randomFrom(XContentType.values()), + mock(DataLoadParams.class)); InterimResultsParams params = InterimResultsParams.builder().build(); manager.flushJob("foo", params); @@ -287,7 +295,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { assertFalse(manager.jobHasActiveAutodetectProcess("foo")); manager.openJob("foo", 1L, false, e -> {}); - manager.processData("foo", createInputStream(""), mock(DataLoadParams.class)); + manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()), + mock(DataLoadParams.class)); assertTrue(manager.jobHasActiveAutodetectProcess("foo")); assertFalse(manager.jobHasActiveAutodetectProcess("bar")); @@ -295,12 +304,13 @@ public class AutodetectProcessManagerTests extends ESTestCase { public void testProcessData_GivenStateNotOpened() throws IOException { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); - when(communicator.writeToJob(any(), any())).thenReturn(new DataCounts("foo")); + when(communicator.writeToJob(any(), any(), any())).thenReturn(new DataCounts("foo")); AutodetectProcessManager manager = createManager(communicator); InputStream inputStream = createInputStream(""); manager.openJob("foo", 1L, false, e -> {}); - DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class)); + DataCounts dataCounts = manager.processData("foo", inputStream, + randomFrom(XContentType.values()), mock(DataLoadParams.class)); assertThat(dataCounts, equalTo(new DataCounts("foo"))); } @@ -357,7 +367,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { private AutodetectProcessManager createManagerAndCallProcessData(AutodetectCommunicator communicator, String jobId) { AutodetectProcessManager manager = createManager(communicator); manager.openJob(jobId, 1L, false, e -> {}); - manager.processData(jobId, createInputStream(""), mock(DataLoadParams.class)); + manager.processData(jobId, createInputStream(""), randomFrom(XContentType.values()), + mock(DataLoadParams.class)); return manager; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java index 311c35d1dc9..141579633d1 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java @@ -8,12 +8,12 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DataDescription.DataFormat; import org.elasticsearch.xpack.ml.job.config.Detector; -import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.junit.Before; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -79,7 +79,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); writer.writeHeader(); - writer.write(inputStream); + writer.write(inputStream, null); verify(dataCountsReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -101,7 +101,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); writer.writeHeader(); - writer.write(inputStream); + writer.write(inputStream, null); verify(dataCountsReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -125,7 +125,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { when(dataCountsReporter.getLatestRecordTime()).thenReturn(new Date(5000L)); CsvDataToProcessWriter writer = createWriter(); writer.writeHeader(); - writer.write(inputStream); + writer.write(inputStream, null); verify(dataCountsReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -155,7 +155,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); writer.writeHeader(); - writer.write(inputStream); + writer.write(inputStream, null); verify(dataCountsReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -188,7 +188,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); writer.writeHeader(); - writer.write(inputStream); + writer.write(inputStream, null); verify(dataCountsReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -220,7 +220,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { CsvDataToProcessWriter writer = createWriter(); writer.writeHeader(); - DataCounts counts = writer.write(inputStream); + DataCounts counts = writer.write(inputStream, null); assertEquals(0L, counts.getInputBytes()); assertEquals(0L, counts.getInputRecordCount()); } @@ -238,7 +238,8 @@ public class CsvDataToProcessWriterTests extends ESTestCase { CsvDataToProcessWriter writer = createWriter(); writer.writeHeader(); - SuperCsvException e = ESTestCase.expectThrows(SuperCsvException.class, () -> writer.write(inputStream)); + SuperCsvException e = ESTestCase.expectThrows(SuperCsvException.class, + () -> writer.write(inputStream, null)); // Expected line numbers are 2 and 10001, but SuperCSV may print the // numbers using a different locale's digit characters assertTrue(e.getMessage(), e.getMessage().matches( diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactoryTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactoryTests.java index 2f38a40fd52..8fafe35e712 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactoryTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/DataToProcessWriterFactoryTests.java @@ -20,7 +20,7 @@ import static org.mockito.Mockito.mock; public class DataToProcessWriterFactoryTests extends ESTestCase { public void testCreate_GivenDataFormatIsJson() { DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataFormat.JSON); + dataDescription.setFormat(DataFormat.XCONTENT); assertTrue(createWriter(dataDescription.build()) instanceof JsonDataToProcessWriter); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java index 4210e5c8398..c6130eaa2f7 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java @@ -8,13 +8,14 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DataDescription.DataFormat; import org.elasticsearch.xpack.ml.job.config.Detector; -import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.junit.Before; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -64,7 +65,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataFormat.JSON); + dataDescription.setFormat(DataFormat.XCONTENT); dataDescription.setTimeFormat(DataDescription.EPOCH); Detector detector = new Detector.Builder("metric", "value").build(); @@ -78,7 +79,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); writer.writeHeader(); - writer.write(inputStream); + writer.write(inputStream, XContentType.JSON); verify(dataCountsReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -100,7 +101,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); writer.writeHeader(); - writer.write(inputStream); + writer.write(inputStream, XContentType.JSON); verify(dataCountsReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -129,7 +130,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); writer.writeHeader(); - writer.write(inputStream); + writer.write(inputStream, XContentType.JSON); List expectedRecords = new ArrayList<>(); // The final field is the control field @@ -158,7 +159,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); writer.writeHeader(); - writer.write(inputStream); + writer.write(inputStream, XContentType.JSON); verify(dataCountsReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -187,7 +188,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); writer.writeHeader(); - writer.write(inputStream); + writer.write(inputStream, XContentType.JSON); verify(dataCountsReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -214,7 +215,8 @@ public class JsonDataToProcessWriterTests extends ESTestCase { JsonDataToProcessWriter writer = createWriter(); writer.writeHeader(); - ESTestCase.expectThrows(ElasticsearchParseException.class, () -> writer.write(inputStream)); + ESTestCase.expectThrows(ElasticsearchParseException.class, + () -> writer.write(inputStream, XContentType.JSON)); } public void testWrite_GivenJsonWithArrayField() @@ -229,7 +231,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); writer.writeHeader(); - writer.write(inputStream); + writer.write(inputStream, XContentType.JSON); verify(dataCountsReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); @@ -258,7 +260,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); writer.writeHeader(); - writer.write(inputStream); + writer.write(inputStream, XContentType.JSON); verify(dataCountsReporter, times(1)).startNewIncrementalCount(); List expectedRecords = new ArrayList<>(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/SimpleJsonRecordReaderTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/XContentRecordReaderTests.java similarity index 79% rename from plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/SimpleJsonRecordReaderTests.java rename to plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/XContentRecordReaderTests.java index 78f90e4c596..d0412e381ef 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/SimpleJsonRecordReaderTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/XContentRecordReaderTests.java @@ -28,13 +28,14 @@ import java.util.Map; import static org.mockito.Mockito.mock; -public class SimpleJsonRecordReaderTests extends ESTestCase { +public class XContentRecordReaderTests extends ESTestCase { public void testRead() throws JsonParseException, IOException { String data = "{\"a\":10, \"b\":20, \"c\":30}\n{\"b\":21, \"a\":11, \"c\":31}\n"; XContentParser parser = createParser(data); Map fieldMap = createFieldMap(); - SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class)); + XContentRecordReader reader = new XContentRecordReader(parser, fieldMap, + mock(Logger.class)); String record[] = new String[3]; boolean gotFields[] = new boolean[3]; @@ -61,7 +62,8 @@ public class SimpleJsonRecordReaderTests extends ESTestCase { fieldMap.put("b", 1); fieldMap.put("c.e", 2); - SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class)); + XContentRecordReader reader = new XContentRecordReader(parser, fieldMap, + mock(Logger.class)); String record[] = new String[3]; boolean gotFields[] = new boolean[3]; @@ -83,7 +85,8 @@ public class SimpleJsonRecordReaderTests extends ESTestCase { fieldMap.put("b", 1); fieldMap.put("c.e", 2); - SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class)); + XContentRecordReader reader = new XContentRecordReader(parser, fieldMap, + mock(Logger.class)); String record[] = new String[3]; boolean gotFields[] = new boolean[3]; @@ -98,14 +101,16 @@ public class SimpleJsonRecordReaderTests extends ESTestCase { public void testRead_GivenMultiValueArrays() throws JsonParseException, IOException { - String data = "{\"a\":[10, 11], \"b\":20, \"c\":{\"d\":30, \"e\":[40, 50]}, \"f\":[\"a\", \"a\", \"a\", \"a\"], \"g\":20}"; + String data = "{\"a\":[10, 11], \"b\":20, \"c\":{\"d\":30, \"e\":[40, 50]}, " + + "\"f\":[\"a\", \"a\", \"a\", \"a\"], \"g\":20}"; XContentParser parser = createParser(data); Map fieldMap = new HashMap<>(); fieldMap.put("a", 0); fieldMap.put("g", 1); fieldMap.put("c.e", 2); - SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class)); + XContentRecordReader reader = new XContentRecordReader(parser, fieldMap, + mock(Logger.class)); String record[] = new String[3]; boolean gotFields[] = new boolean[3]; @@ -119,18 +124,20 @@ public class SimpleJsonRecordReaderTests extends ESTestCase { } /** - * There's a problem with the parser where in this case it skips over the first 2 records - * instead of to the end of the first record which is invalid json. - * This means we miss the next record after a bad one. + * There's a problem with the parser where in this case it skips over the + * first 2 records instead of to the end of the first record which is + * invalid json. This means we miss the next record after a bad one. */ public void testRead_RecoverFromBadJson() throws JsonParseException, IOException { // no opening '{' - String data = "\"a\":10, \"b\":20, \"c\":30}\n{\"b\":21, \"a\":11, \"c\":31}\n{\"c\":32, \"b\":22, \"a\":12}"; + String data = "\"a\":10, \"b\":20, \"c\":30}\n{\"b\":21, \"a\":11, \"c\":31}\n" + + "{\"c\":32, \"b\":22, \"a\":12}"; XContentParser parser = createParser(data); Map fieldMap = createFieldMap(); - SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class)); + XContentRecordReader reader = new XContentRecordReader(parser, fieldMap, + mock(Logger.class)); String record[] = new String[3]; boolean gotFields[] = new boolean[3]; @@ -147,12 +154,13 @@ public class SimpleJsonRecordReaderTests extends ESTestCase { public void testRead_RecoverFromBadNestedJson() throws JsonParseException, IOException { // nested object 'd' is missing a ',' - String data = "{\"a\":10, \"b\":20, \"c\":30}\n" + - "{\"b\":21, \"d\" : {\"ee\": 1 \"ff\":0}, \"a\":11, \"c\":31}"; + String data = "{\"a\":10, \"b\":20, \"c\":30}\n" + + "{\"b\":21, \"d\" : {\"ee\": 1 \"ff\":0}, \"a\":11, \"c\":31}"; XContentParser parser = createParser(data); Map fieldMap = createFieldMap(); - SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class)); + XContentRecordReader reader = new XContentRecordReader(parser, fieldMap, + mock(Logger.class)); String record[] = new String[3]; boolean gotFields[] = new boolean[3]; @@ -173,23 +181,24 @@ public class SimpleJsonRecordReaderTests extends ESTestCase { // missing a ':' String format = "{\"a\":1%1$d, \"b\"2%1$d, \"c\":3%1$d}\n"; StringBuilder builder = new StringBuilder(); - for (int i = 0; i < SimpleJsonRecordReader.PARSE_ERRORS_LIMIT; i++) { + for (int i = 0; i < XContentRecordReader.PARSE_ERRORS_LIMIT; i++) { builder.append(String.format(Locale.ROOT, format, i)); } XContentParser parser = createParser(builder.toString()); Map fieldMap = createFieldMap(); - SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class)); + XContentRecordReader reader = new XContentRecordReader(parser, fieldMap, + mock(Logger.class)); ESTestCase.expectThrows(ElasticsearchParseException.class, () -> readUntilError(reader)); } - private void readUntilError(SimpleJsonRecordReader reader) throws IOException { + private void readUntilError(XContentRecordReader reader) throws IOException { String record[] = new String[3]; boolean gotFields[] = new boolean[3]; // this should throw after PARSE_ERRORS_LIMIT errors - for (int i = 0; i < SimpleJsonRecordReader.PARSE_ERRORS_LIMIT; i++) { + for (int i = 0; i < XContentRecordReader.PARSE_ERRORS_LIMIT; i++) { reader.read(record, gotFields); } } @@ -198,13 +207,13 @@ public class SimpleJsonRecordReaderTests extends ESTestCase { char controlChar = '\u0002'; String data = "{\"a\":10, \"" + controlChar + "\" : 5, \"b\":20, \"c\":30}" - + "\n{\"b\":21, \"a\":11, \"c\":31}" - + "\n{\"c\":32, \"b\":22, \"a\":12}\n"; + + "\n{\"b\":21, \"a\":11, \"c\":31}" + "\n{\"c\":32, \"b\":22, \"a\":12}\n"; XContentParser parser = createParser(data); Map fieldMap = createFieldMap(); - SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class)); + XContentRecordReader reader = new XContentRecordReader(parser, fieldMap, + mock(Logger.class)); String record[] = new String[3]; boolean gotFields[] = new boolean[3]; @@ -215,8 +224,10 @@ public class SimpleJsonRecordReaderTests extends ESTestCase { } private XContentParser createParser(String input) throws JsonParseException, IOException { - ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); - InputStream inputStream2 = new CountingInputStream(inputStream, mock(DataCountsReporter.class)); + ByteArrayInputStream inputStream = new ByteArrayInputStream( + input.getBytes(StandardCharsets.UTF_8)); + InputStream inputStream2 = new CountingInputStream(inputStream, + mock(DataCountsReporter.class)); return XContentFactory.xContent(XContentType.JSON) .createParser(new NamedXContentRegistry(Collections.emptyList()), inputStream2); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 85fef3e8ae8..ecf5294cda2 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -116,7 +116,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { protected Job.Builder createJob(String id) { DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.JSON); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); dataDescription.setTimeFormat(DataDescription.EPOCH_MS); Detector.Builder d = new Detector.Builder("count", null); @@ -133,7 +133,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { public static Job.Builder createFareQuoteJob(String id) { DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.JSON); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); dataDescription.setTimeFormat(DataDescription.EPOCH); dataDescription.setTimeField("time"); @@ -153,7 +153,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { public static Job.Builder createScheduledJob(String jobId) { DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.JSON); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); dataDescription.setTimeFormat("yyyy-MM-dd HH:mm:ss"); Detector.Builder d = new Detector.Builder("count", null); diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.post_data.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.post_data.json index 569abb91221..938e134e6d3 100644 --- a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.post_data.json +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.post_data.json @@ -24,7 +24,8 @@ }, "body": { "description" : "The data to process", - "required" : true + "required" : true, + "serialize" : "bulk" } } } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yaml index f721e14bdb1..a5a20a699a0 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yaml @@ -10,7 +10,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"JSON", + "format":"xcontent", "time_field":"time", "time_format":"epoch" } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml index 945c5dcbe74..b591896b710 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml @@ -10,7 +10,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"JSON", + "format":"xcontent", "time_field":"time", "time_format":"epoch" } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeeds.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeeds.yaml index 248e32e948b..5db3493d7d8 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeeds.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeeds.yaml @@ -10,7 +10,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"JSON", + "format":"xcontent", "time_field":"time", "time_format":"epoch" } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yaml index a484fbc3768..bf5a269b5bd 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yaml @@ -17,7 +17,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"JSON", + "format":"xcontent", "time_field":"time", "time_format":"epoch" } @@ -37,7 +37,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"JSON", + "format":"xcontent", "time_field":"time", "time_format":"epoch" } @@ -288,7 +288,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"JSON", + "format":"xcontent", "time_field":"time", "time_format":"epoch" } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml index c01e07e5c65..491a64e2921 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml @@ -273,7 +273,7 @@ "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"JSON", + "format":"xcontent", "time_field":"time", "time_format":"yyyy-MM-dd HH:mm:ssX" } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get.yaml index 57b63863ede..20fde410f4a 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get.yaml @@ -11,7 +11,7 @@ setup: "detectors" :[{"function":"count"}] }, "data_description" : { - "format":"JSON", + "format":"xcontent", "time_field":"time", "time_format":"epoch" } @@ -29,7 +29,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format" : "JSON", + "format" : "xcontent", "time_field":"time", "time_format":"yyyy-MM-dd'T'HH:mm:ssX" } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_result_buckets.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_result_buckets.yaml index 1de875e0c0b..4dc5e32a9ac 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_result_buckets.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_result_buckets.yaml @@ -8,7 +8,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"JSON", + "format":"xcontent", "time_field":"time" } } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_result_records.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_result_records.yaml index d92f6e9a9bb..75f72aab16e 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_result_records.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_result_records.yaml @@ -8,7 +8,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"JSON", + "format":"xcontent", "time_field":"time" } } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml index 72524f46f76..dab9e42ceeb 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml @@ -11,7 +11,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"JSON", + "format":"xcontent", "time_field":"time", "time_format":"epoch" } @@ -33,7 +33,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format" : "JSON", + "format" : "xcontent", "time_field":"time", "time_format":"yyyy-MM-dd'T'HH:mm:ssX" } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/post_data.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/post_data.yaml index 307cbcb3ae9..959b42933bf 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/post_data.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/post_data.yaml @@ -11,7 +11,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"JSON", + "format":"xcontent", "time_field":"time", "time_format":"epoch" } @@ -28,7 +28,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"JSON" + "format":"xcontent" } } @@ -38,22 +38,24 @@ setup: --- "Test POST data job api, flush, close and verify DataCounts doc": - - skip: - features: ["headers"] - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json xpack.ml.post_data: job_id: farequote - body: > - {"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"} - {"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"} + body: + - airline: AAL + responsetime: 132.2046 + sourcetype: farequote + time: 1403481600 + - airline: JZA + responsetime: 990.4628 + sourcetype: farequote + time: 1403481700 - match: { processed_record_count: 2 } - match: { processed_field_count: 4} - - match: { input_bytes: 178 } + - gte: { input_bytes: 170 } + - lte: { input_bytes: 180 } - match: { input_field_count: 6 } - match: { invalid_date_count: 0 } - match: { missing_field_count: 0 } @@ -84,7 +86,8 @@ setup: - match: { _source.processed_record_count: 2 } - match: { _source.processed_field_count: 4} - - match: { _source.input_bytes: 178 } + - gte: { _source.input_bytes: 170 } + - lte: { _source.input_bytes: 180 } - match: { _source.input_field_count: 6 } - match: { _source.invalid_date_count: 0 } - match: { _source.missing_field_count: 0 } @@ -111,41 +114,50 @@ setup: --- "Test POST data with invalid parameters": - - skip: - features: ["headers"] - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json catch: missing xpack.ml.post_data: job_id: not_a_job - body: {} + body: + - airline: AAL + responsetime: 132.2046 + sourcetype: farequote + time: 1403481600 + - airline: JZA + responsetime: 990.4628 + sourcetype: farequote + time: 1403481700 - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json catch: /parse_exception/ xpack.ml.post_data: job_id: farequote reset_start: not_a_date - body: > - {"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"} - {"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481600"} + body: + - airline: AAL + responsetime: 132.2046 + sourcetype: farequote + time: 1403481600 + - airline: JZA + responsetime: 990.4628 + sourcetype: farequote + time: 1403481700 - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json catch: /parse_exception/ xpack.ml.post_data: job_id: farequote reset_end: end_not_a_date - body: > - {"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"} - {"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481600"} + body: + - airline: AAL + responsetime: 132.2046 + sourcetype: farequote + time: 1403481600 + - airline: JZA + responsetime: 990.4628 + sourcetype: farequote + time: 1403481700 --- "Test Flush data with invalid parameters": @@ -186,30 +198,27 @@ setup: --- "Test flushing, posting and closing a closed job": - - skip: - features: ["headers"] - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json catch: /status_exception/ xpack.ml.flush_job: job_id: closed_job - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json catch: /status_exception/ xpack.ml.close_job: job_id: closed_job - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json catch: /status_exception/ xpack.ml.post_data: job_id: closed_job - body: {} + body: + - airline: AAL + responsetime: 132.2046 + sourcetype: farequote + time: 1403481600 + - airline: JZA + responsetime: 990.4628 + sourcetype: farequote + time: 1403481700 diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml index 2d9eba3b25e..7bc8a9db7cf 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yaml @@ -21,7 +21,7 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, "data_description" : { - "format":"JSON", + "format":"xcontent", "time_field":"time", "time_format":"epoch" }