[ML] Support all XContent types in Data API (elastic/x-pack-elasticsearch#812)

* [ML] Support all XContent types in Data API

This changes the POST Data API so that it accepts all XContent types instead of just JSON.

For now the datafeed is restricted to only sending JSON to the POST data API.

* Rename SimpleJsonRecordReader to XContentRecordReader

Also renames `DataFormat.JSON` to `DataFormat.XCONTENT`

* fixes YAML tests

Original commit: elastic/x-pack-elasticsearch@5fd20690b8
This commit is contained in:
Colin Goodheart-Smithe 2017-03-24 12:57:02 +00:00 committed by GitHub
parent 73434dadeb
commit fd54515ecb
39 changed files with 291 additions and 194 deletions

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -135,6 +136,7 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
private String resetStart = "";
private String resetEnd = "";
private DataDescription dataDescription;
private XContentType xContentType;
private BytesReference content;
Request() {
@ -170,8 +172,13 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
public BytesReference getContent() { return content; }
public void setContent(BytesReference content) {
public XContentType getXContentType() {
return xContentType;
}
public void setContent(BytesReference content, XContentType xContentType) {
this.content = content;
this.xContentType = xContentType;
}
@Override
@ -181,6 +188,9 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
resetEnd = in.readOptionalString();
dataDescription = in.readOptionalWriteable(DataDescription::new);
content = in.readBytesReference();
if (in.readBoolean()) {
xContentType = XContentType.readFrom(in);
}
}
@Override
@ -190,12 +200,17 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
out.writeOptionalString(resetEnd);
out.writeOptionalWriteable(dataDescription);
out.writeBytesReference(content);
boolean hasXContentType = xContentType != null;
out.writeBoolean(hasXContentType);
if (hasXContentType) {
xContentType.writeTo(out);
}
}
@Override
public int hashCode() {
// content stream not included
return Objects.hash(jobId, resetStart, resetEnd, dataDescription);
return Objects.hash(jobId, resetStart, resetEnd, dataDescription, xContentType);
}
@Override
@ -212,7 +227,8 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
return Objects.equals(jobId, other.jobId) &&
Objects.equals(resetStart, other.resetStart) &&
Objects.equals(resetEnd, other.resetEnd) &&
Objects.equals(dataDescription, other.dataDescription);
Objects.equals(dataDescription, other.dataDescription) &&
Objects.equals(xContentType, other.xContentType);
}
}
@ -239,7 +255,8 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build();
DataLoadParams params = new DataLoadParams(timeRange, Optional.ofNullable(request.getDataDescription()));
try {
DataCounts dataCounts = processManager.processData(request.getJobId(), request.content.streamInput(), params);
DataCounts dataCounts = processManager.processData(request.getJobId(),
request.content.streamInput(), request.getXContentType(), params);
listener.onResponse(new Response(dataCounts));
} catch (Exception e) {
listener.onFailure(e);

View File

@ -10,6 +10,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
@ -155,7 +156,7 @@ class DatafeedJob {
if (extractedData.isPresent()) {
DataCounts counts;
try (InputStream in = extractedData.get()) {
counts = postData(in);
counts = postData(in, XContentType.JSON);
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
@ -189,12 +190,13 @@ class DatafeedJob {
client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet();
}
private DataCounts postData(InputStream inputStream) throws IOException {
private DataCounts postData(InputStream inputStream, XContentType xContentType)
throws IOException {
PostDataAction.Request request = new PostDataAction.Request(jobId);
request.setDataDescription(dataDescription);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Streams.copy(inputStream, outputStream);
request.setContent(new BytesArray(outputStream.toByteArray()));
request.setContent(new BytesArray(outputStream.toByteArray()), xContentType);
PostDataAction.Response response = client.execute(PostDataAction.INSTANCE, request).actionGet();
return response.getDataCounts();
}

View File

@ -244,7 +244,7 @@ public class DatafeedJobRunner extends AbstractComponent {
private static DataDescription buildDataDescription(Job job) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.JSON);
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
if (job.getDataDescription() != null) {
dataDescription.setTimeField(job.getDataDescription().getTimeField());
}

View File

@ -40,7 +40,7 @@ public class DataDescription extends ToXContentToBytes implements Writeable {
* Enum of the acceptable data formats.
*/
public enum DataFormat implements Writeable {
JSON,
XCONTENT,
DELIMITED;
/**
@ -175,7 +175,7 @@ public class DataDescription extends ToXContentToBytes implements Writeable {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (dataFormat != DataFormat.JSON) {
if (dataFormat != DataFormat.XCONTENT) {
builder.field(FORMAT_FIELD.getPreferredName(), dataFormat);
}
builder.field(TIME_FIELD_NAME_FIELD.getPreferredName(), timeFieldName);
@ -192,7 +192,7 @@ public class DataDescription extends ToXContentToBytes implements Writeable {
/**
* The format of the data to be processed.
* Defaults to {@link DataDescription.DataFormat#JSON}
* Defaults to {@link DataDescription.DataFormat#XCONTENT}
*
* @return The data format
*/
@ -252,7 +252,7 @@ public class DataDescription extends ToXContentToBytes implements Writeable {
* @return True if the data should be transformed.
*/
public boolean transform() {
return dataFormat == DataFormat.JSON || isTransformTime();
return dataFormat == DataFormat.XCONTENT || isTransformTime();
}
/**
@ -315,7 +315,7 @@ public class DataDescription extends ToXContentToBytes implements Writeable {
public static class Builder {
private DataFormat dataFormat = DataFormat.JSON;
private DataFormat dataFormat = DataFormat.XCONTENT;
private String timeFieldName = DEFAULT_TIME_FIELD;
private String timeFormat = EPOCH_MS;
private Character fieldDelimiter;

View File

@ -12,6 +12,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
@ -79,7 +80,8 @@ public class AutodetectCommunicator implements Closeable {
dataCountsReporter, xContentRegistry);
}
public DataCounts writeToJob(InputStream inputStream, DataLoadParams params) throws IOException {
public DataCounts writeToJob(InputStream inputStream, XContentType xContentType,
DataLoadParams params) throws IOException {
return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, job.getId()), () -> {
if (params.isResettingBuckets()) {
autodetectProcess.writeResetBucketsControlMessage(params);
@ -87,7 +89,7 @@ public class AutodetectCommunicator implements Closeable {
CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter);
DataToProcessWriter autoDetectWriter = createProcessWriter(params.getDataDescription());
DataCounts results = autoDetectWriter.write(countingStream);
DataCounts results = autoDetectWriter.write(countingStream, xContentType);
autoDetectWriter.flush();
return results;
}, false);

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
@ -135,16 +136,18 @@ public class AutodetectProcessManager extends AbstractComponent {
*
* @param jobId the jobId
* @param input Data input stream
* @param xContentType the {@link XContentType} of the input
* @param params Data processing parameters
* @return Count of records, fields, bytes, etc written
*/
public DataCounts processData(String jobId, InputStream input, DataLoadParams params) {
public DataCounts processData(String jobId, InputStream input, XContentType xContentType,
DataLoadParams params) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
throw new IllegalStateException("[" + jobId + "] Cannot process data: no active autodetect process for job");
}
try {
return communicator.writeToJob(input, params);
return communicator.writeToJob(input, xContentType, params);
// TODO check for errors from autodetect
} catch (IOException e) {
String msg = String.format(Locale.ROOT, "Exception writing to process for job %s", jobId);

View File

@ -76,11 +76,11 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
}
/**
* Set up the field index mappings.
* This must be called before {@linkplain DataToProcessWriter#write(java.io.InputStream)}.
* Set up the field index mappings. This must be called before
* {@linkplain DataToProcessWriter#write(java.io.InputStream, org.elasticsearch.common.xcontent.XContentType)}.
* <p>
* Finds the required input indexes in the <code>header</code>
* and sets the mappings to the corresponding output indexes.
* Finds the required input indexes in the <code>header</code> and sets the
* mappings to the corresponding output indexes.
*/
void buildFieldIndexMapping(String[] header) throws IOException {
Collection<String> inputFields = inputFields();

View File

@ -7,11 +7,12 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.supercsv.io.CsvListReader;
import org.supercsv.prefs.CsvPreference;
@ -65,7 +66,7 @@ class CsvDataToProcessWriter extends AbstractDataToProcessWriter {
* header a exception is thrown
*/
@Override
public DataCounts write(InputStream inputStream) throws IOException {
public DataCounts write(InputStream inputStream, XContentType xContentType) throws IOException {
CsvPreference csvPref = new CsvPreference.Builder(
dataDescription.getQuoteCharacter(),
dataDescription.getFieldDelimiter(),

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import java.io.IOException;
@ -32,7 +33,7 @@ public interface DataToProcessWriter {
*
* @return Counts of the records processed, bytes read etc
*/
DataCounts write(InputStream inputStream) throws IOException;
DataCounts write(InputStream inputStream, XContentType xContentType) throws IOException;
/**
* Flush the outputstream

View File

@ -33,7 +33,7 @@ public final class DataToProcessWriterFactory {
AnalysisConfig analysisConfig, DataCountsReporter dataCountsReporter,
NamedXContentRegistry xContentRegistry) {
switch (dataDescription.getFormat()) {
case JSON:
case XCONTENT:
return new JsonDataToProcessWriter(includeControlField, autodetectProcess,
dataDescription, analysisConfig, dataCountsReporter, xContentRegistry);
case DELIMITED:

View File

@ -52,10 +52,10 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
* timeField is missing from the JOSN inputIndex an exception is thrown
*/
@Override
public DataCounts write(InputStream inputStream) throws IOException {
public DataCounts write(InputStream inputStream, XContentType xContentType) throws IOException {
dataCountsReporter.startNewIncrementalCount();
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON)
try (XContentParser parser = XContentFactory.xContent(xContentType)
.createParser(xContentRegistry, inputStream)) {
writeJson(parser);
@ -78,7 +78,7 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
// We never expect to get the control field
boolean[] gotFields = new boolean[analysisFields.size()];
SimpleJsonRecordReader recordReader = new SimpleJsonRecordReader(parser, inFieldIndexes,
XContentRecordReader recordReader = new XContentRecordReader(parser, inFieldIndexes,
LOGGER);
long inputFieldCount = recordReader.read(input, gotFields);
while (inputFieldCount >= 0) {

View File

@ -18,7 +18,7 @@ import java.util.Deque;
import java.util.Map;
import java.util.Objects;
class SimpleJsonRecordReader {
class XContentRecordReader {
static final int PARSE_ERRORS_LIMIT = 100;
protected final XContentParser parser;
@ -40,26 +40,28 @@ class SimpleJsonRecordReader {
* @param logger
* logger
*/
SimpleJsonRecordReader(XContentParser parser, Map<String, Integer> fieldMap, Logger logger) {
XContentRecordReader(XContentParser parser, Map<String, Integer> fieldMap, Logger logger) {
this.parser = Objects.requireNonNull(parser);
this.fieldMap = Objects.requireNonNull(fieldMap);
this.logger = Objects.requireNonNull(logger);
}
/**
* Read the JSON object and write to the record array.
* Nested objects are flattened with the field names separated by
* a '.'.
* e.g. for a record with a nested 'tags' object:
* "{"name":"my.test.metric1","tags":{"tag1":"blah","tag2":"boo"},"time":1350824400,"value":12345.678}"
* use 'tags.tag1' to reference the tag1 field in the nested object
* Read the JSON object and write to the record array. Nested objects are
* flattened with the field names separated by a '.'. e.g. for a record with
* a nested 'tags' object:
* "{"name":"my.test.metric1","tags":{"tag1":"blah","tag2":"boo"},
* "time":1350824400,"value":12345.678}" use 'tags.tag1' to reference the
* tag1 field in the nested object
* <p>
* Array fields in the JSON are ignored
*
* @param record Read fields are written to this array. This array is first filled with empty
* strings and will never contain a <code>null</code>
* @param gotFields boolean array each element is true if that field
* was read
* @param record
* Read fields are written to this array. This array is first
* filled with empty strings and will never contain a
* <code>null</code>
* @param gotFields
* boolean array each element is true if that field was read
* @return The number of fields in the JSON doc or -1 if nothing was read
* because the end of the stream was reached
*/

View File

@ -33,7 +33,7 @@ public class RestPostDataAction extends BaseRestHandler {
PostDataAction.Request request = new PostDataAction.Request(restRequest.param(Job.ID.getPreferredName()));
request.setResetStart(restRequest.param(PostDataAction.Request.RESET_START.getPreferredName(), DEFAULT_RESET_START));
request.setResetEnd(restRequest.param(PostDataAction.Request.RESET_END.getPreferredName(), DEFAULT_RESET_END));
request.setContent(restRequest.content());
request.setContent(restRequest.content(), restRequest.getXContentType());
return channel -> client.execute(PostDataAction.INSTANCE, request, new RestStatusToXContentListener<>(channel));
}

View File

@ -5,6 +5,10 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DataDescription.DataFormat;
import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
public class PostDataActionRequestTests extends AbstractStreamableTestCase<PostDataAction.Request> {
@ -17,6 +21,14 @@ public class PostDataActionRequestTests extends AbstractStreamableTestCase<PostD
if (randomBoolean()) {
request.setResetEnd(randomAsciiOfLengthBetween(1, 20));
}
if (randomBoolean()) {
request.setDataDescription(new DataDescription(randomFrom(DataFormat.values()),
randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20),
randomAsciiOfLength(1).charAt(0), randomAsciiOfLength(1).charAt(0)));
}
if (randomBoolean()) {
request.setContent(new BytesArray(new byte[0]), randomFrom(XContentType.values()));
}
return request;
}

View File

@ -15,9 +15,11 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@ -40,10 +42,10 @@ import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
@ -63,9 +65,9 @@ import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ -183,7 +185,9 @@ public class DatafeedJobRunnerTests extends ESTestCase {
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
byte[] contentBytes = "".getBytes(Charset.forName("utf-8"));
XContentType xContentType = XContentType.JSON;
InputStream in = new ByteArrayInputStream(contentBytes);
when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
@ -194,16 +198,19 @@ public class DatafeedJobRunnerTests extends ESTestCase {
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any());
verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id")));
verify(client).execute(same(PostDataAction.INSTANCE),
eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType)));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
}
private static PostDataAction.Request createExpectedPostDataRequest(String jobId) {
private static PostDataAction.Request createExpectedPostDataRequest(String jobId,
byte[] contentBytes, XContentType xContentType) {
DataDescription.Builder expectedDataDescription = new DataDescription.Builder();
expectedDataDescription.setTimeFormat("epoch_ms");
expectedDataDescription.setFormat(DataDescription.DataFormat.JSON);
expectedDataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
PostDataAction.Request expectedPostDataRequest = new PostDataAction.Request(jobId);
expectedPostDataRequest.setDataDescription(expectedDataDescription.build());
expectedPostDataRequest.setContent(new BytesArray(contentBytes), xContentType);
return expectedPostDataRequest;
}
@ -265,7 +272,9 @@ public class DatafeedJobRunnerTests extends ESTestCase {
DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
byte[] contentBytes = "".getBytes(Charset.forName("utf-8"));
InputStream in = new ByteArrayInputStream(contentBytes);
XContentType xContentType = XContentType.JSON;
when(dataExtractor.next()).thenReturn(Optional.of(in));
DataCounts dataCounts = new DataCounts("job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
new Date(0), new Date(0), new Date(0), new Date(0), new Date(0));
@ -282,7 +291,8 @@ public class DatafeedJobRunnerTests extends ESTestCase {
task.stop("test");
verify(handler).accept(null);
} else {
verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id")));
verify(client).execute(same(PostDataAction.INSTANCE),
eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType)));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME), any());
}

View File

@ -7,6 +7,8 @@ package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
@ -47,6 +49,7 @@ public class DatafeedJobTests extends ESTestCase {
private ArgumentCaptor<FlushJobAction.Request> flushJobRequests;
private long currentTime;
private XContentType xContentType;
@Before
@SuppressWarnings("unchecked")
@ -57,19 +60,22 @@ public class DatafeedJobTests extends ESTestCase {
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
client = mock(Client.class);
dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.JSON);
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
ActionFuture<PostDataAction.Response> jobDataFuture = mock(ActionFuture.class);
flushJobFuture = mock(ActionFuture.class);
currentTime = 0;
xContentType = XContentType.JSON;
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream inputStream = new ByteArrayInputStream("content".getBytes(StandardCharsets.UTF_8));
byte[] contentBytes = "content".getBytes(StandardCharsets.UTF_8);
InputStream inputStream = new ByteArrayInputStream(contentBytes);
when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0),
new Date(0), new Date(0), new Date(0));
PostDataAction.Request expectedRequest = new PostDataAction.Request("_job_id");
expectedRequest.setDataDescription(dataDescription.build());
expectedRequest.setContent(new BytesArray(contentBytes), xContentType);
when(client.execute(same(PostDataAction.INSTANCE), eq(expectedRequest))).thenReturn(jobDataFuture);
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));

View File

@ -11,6 +11,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
@ -325,7 +326,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
postDataRequest.setContent(new BytesArray(
"{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" +
"{\"airline\":\"JZA\",\"responsetime\":\"990.4628\",\"sourcetype\":\"farequote\",\"time\":\"1403481700\"}"
));
), XContentType.JSON);
PostDataAction.Response response = client().execute(PostDataAction.INSTANCE, postDataRequest).actionGet();
assertEquals(2, response.getDataCounts().getProcessedRecordCount());

View File

@ -488,7 +488,8 @@ public class DatafeedJobIT extends ESRestTestCase {
String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n"
+ " \"analysis_config\" : {\n" + " \"bucket_span\":\"1h\",\n"
+ " \"detectors\" :[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n"
+ " },\n" + " \"data_description\" : {\n" + " \"format\":\"JSON\",\n"
+ " },\n" + " \"data_description\" : {\n"
+ " \"format\":\"xcontent\",\n"
+ " \"time_field\":\"time stamp\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n"
+ "}";
return client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + id,

View File

@ -25,7 +25,7 @@ public class DataDescriptionTests extends AbstractSerializingTestCase<DataDescri
public void testDefault() {
DataDescription dataDescription = new DataDescription.Builder().build();
assertThat(dataDescription.getFormat(), equalTo(DataFormat.JSON));
assertThat(dataDescription.getFormat(), equalTo(DataFormat.XCONTENT));
assertThat(dataDescription.getTimeField(), equalTo("time"));
assertThat(dataDescription.getTimeFormat(), equalTo("epoch_ms"));
assertThat(dataDescription.getFieldDelimiter(), is(nullValue()));
@ -102,7 +102,7 @@ public class DataDescriptionTests extends AbstractSerializingTestCase<DataDescri
public void testEquals_GivenDifferentDateFormat() {
DataDescription.Builder description1 = new DataDescription.Builder();
description1.setFormat(DataFormat.JSON);
description1.setFormat(DataFormat.XCONTENT);
description1.setQuoteCharacter('"');
description1.setTimeField("timestamp");
description1.setTimeFormat("epoch");
@ -121,14 +121,14 @@ public class DataDescriptionTests extends AbstractSerializingTestCase<DataDescri
public void testEquals_GivenDifferentQuoteCharacter() {
DataDescription.Builder description1 = new DataDescription.Builder();
description1.setFormat(DataFormat.JSON);
description1.setFormat(DataFormat.XCONTENT);
description1.setQuoteCharacter('"');
description1.setTimeField("timestamp");
description1.setTimeFormat("epoch");
description1.setFieldDelimiter(',');
DataDescription.Builder description2 = new DataDescription.Builder();
description2.setFormat(DataFormat.JSON);
description2.setFormat(DataFormat.XCONTENT);
description2.setQuoteCharacter('\'');
description2.setTimeField("timestamp");
description2.setTimeFormat("epoch");
@ -140,14 +140,14 @@ public class DataDescriptionTests extends AbstractSerializingTestCase<DataDescri
public void testEquals_GivenDifferentTimeField() {
DataDescription.Builder description1 = new DataDescription.Builder();
description1.setFormat(DataFormat.JSON);
description1.setFormat(DataFormat.XCONTENT);
description1.setQuoteCharacter('"');
description1.setTimeField("timestamp");
description1.setTimeFormat("epoch");
description1.setFieldDelimiter(',');
DataDescription.Builder description2 = new DataDescription.Builder();
description2.setFormat(DataFormat.JSON);
description2.setFormat(DataFormat.XCONTENT);
description2.setQuoteCharacter('"');
description2.setTimeField("time");
description2.setTimeFormat("epoch");
@ -159,14 +159,14 @@ public class DataDescriptionTests extends AbstractSerializingTestCase<DataDescri
public void testEquals_GivenDifferentTimeFormat() {
DataDescription.Builder description1 = new DataDescription.Builder();
description1.setFormat(DataFormat.JSON);
description1.setFormat(DataFormat.XCONTENT);
description1.setQuoteCharacter('"');
description1.setTimeField("timestamp");
description1.setTimeFormat("epoch");
description1.setFieldDelimiter(',');
DataDescription.Builder description2 = new DataDescription.Builder();
description2.setFormat(DataFormat.JSON);
description2.setFormat(DataFormat.XCONTENT);
description2.setQuoteCharacter('"');
description2.setTimeField("timestamp");
description2.setTimeFormat("epoch_ms");
@ -178,14 +178,14 @@ public class DataDescriptionTests extends AbstractSerializingTestCase<DataDescri
public void testEquals_GivenDifferentFieldDelimiter() {
DataDescription.Builder description1 = new DataDescription.Builder();
description1.setFormat(DataFormat.JSON);
description1.setFormat(DataFormat.XCONTENT);
description1.setQuoteCharacter('"');
description1.setTimeField("timestamp");
description1.setTimeFormat("epoch");
description1.setFieldDelimiter(',');
DataDescription.Builder description2 = new DataDescription.Builder();
description2.setFormat(DataFormat.JSON);
description2.setFormat(DataFormat.XCONTENT);
description2.setQuoteCharacter('"');
description2.setTimeField("timestamp");
description2.setTimeFormat("epoch");

View File

@ -5,13 +5,13 @@
*/
package org.elasticsearch.xpack.ml.job.config;
import java.io.IOException;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.DataDescription.DataFormat;
import java.io.IOException;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -23,23 +23,23 @@ public class DataFormatTests extends ESTestCase {
assertEquals(DataFormat.DELIMITED, DataFormat.forString("delimited"));
assertEquals(DataFormat.DELIMITED, DataFormat.forString("DELIMITED"));
assertEquals(DataFormat.JSON, DataFormat.forString("json"));
assertEquals(DataFormat.JSON, DataFormat.forString("JSON"));
assertEquals(DataFormat.XCONTENT, DataFormat.forString("xcontent"));
assertEquals(DataFormat.XCONTENT, DataFormat.forString("XCONTENT"));
}
public void testToString() {
assertEquals("delimited", DataFormat.DELIMITED.toString());
assertEquals("json", DataFormat.JSON.toString());
assertEquals("xcontent", DataFormat.XCONTENT.toString());
}
public void testValidOrdinals() {
assertThat(DataFormat.JSON.ordinal(), equalTo(0));
assertThat(DataFormat.XCONTENT.ordinal(), equalTo(0));
assertThat(DataFormat.DELIMITED.ordinal(), equalTo(1));
}
public void testwriteTo() throws Exception {
try (BytesStreamOutput out = new BytesStreamOutput()) {
DataFormat.JSON.writeTo(out);
DataFormat.XCONTENT.writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
assertThat(in.readVInt(), equalTo(0));
}
@ -57,7 +57,7 @@ public class DataFormatTests extends ESTestCase {
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeVInt(0);
try (StreamInput in = out.bytes().streamInput()) {
assertThat(DataFormat.readFromStream(in), equalTo(DataFormat.JSON));
assertThat(DataFormat.readFromStream(in), equalTo(DataFormat.XCONTENT));
}
}
try (BytesStreamOutput out = new BytesStreamOutput()) {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
@ -49,7 +50,8 @@ public class AutodetectCommunicatorTests extends ESTestCase {
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build(), Optional.empty());
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
communicator.writeToJob(new ByteArrayInputStream(new byte[0]), params);
communicator.writeToJob(new ByteArrayInputStream(new byte[0]),
randomFrom(XContentType.values()), params);
Mockito.verify(process).writeResetBucketsControlMessage(params);
}
}
@ -159,13 +161,15 @@ public class AutodetectCommunicatorTests extends ESTestCase {
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class));
XContentType xContentType = randomFrom(XContentType.values());
communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class,
() -> communicator.writeToJob(in, mock(DataLoadParams.class)));
() -> communicator.writeToJob(in, xContentType, mock(DataLoadParams.class)));
communicator.inUse.set(null);
communicator.writeToJob(in, new DataLoadParams(TimeRange.builder().build(), Optional.empty()));
communicator.writeToJob(in, xContentType,
new DataLoadParams(TimeRange.builder().build(), Optional.empty()));
}
public void testFlushInUse() throws IOException {

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.JobManager;
@ -67,9 +68,10 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
/**
* Calling the {@link AutodetectProcessManager#processData(String, InputStream, DataLoadParams)}
* method causes an AutodetectCommunicator to be created on demand. Most of these tests have to
* do that before they can assert other things
* Calling the
* {@link AutodetectProcessManager#processData(String, InputStream, XContentType, DataLoadParams)}
* method causes an AutodetectCommunicator to be created on demand. Most of
* these tests have to do that before they can assert other things
*/
public class AutodetectProcessManagerTests extends ESTestCase {
@ -203,7 +205,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty());
manager.openJob("foo", 1L, false, e -> {});
manager.processData("foo", createInputStream(""), params);
manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()),
params);
assertEquals(1, manager.numberOfOpenJobs());
}
@ -213,11 +216,13 @@ public class AutodetectProcessManagerTests extends ESTestCase {
DataLoadParams params = mock(DataLoadParams.class);
InputStream inputStream = createInputStream("");
doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream, params);
XContentType xContentType = randomFrom(XContentType.values());
doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream,
xContentType, params);
manager.openJob("foo", 1L, false, e -> {});
ESTestCase.expectThrows(ElasticsearchException.class,
() -> manager.processData("foo", inputStream, params));
() -> manager.processData("foo", inputStream, xContentType, params));
}
public void testCloseJob() {
@ -226,7 +231,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
assertEquals(0, manager.numberOfOpenJobs());
manager.openJob("foo", 1L, false, e -> {});
manager.processData("foo", createInputStream(""), mock(DataLoadParams.class));
manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class));
// job is created
assertEquals(1, manager.numberOfOpenJobs());
@ -237,12 +243,13 @@ public class AutodetectProcessManagerTests extends ESTestCase {
public void testBucketResetMessageIsSent() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManager(communicator);
XContentType xContentType = randomFrom(XContentType.values());
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), Optional.empty());
InputStream inputStream = createInputStream("");
manager.openJob("foo", 1L, false, e -> {});
manager.processData("foo", inputStream, params);
verify(communicator).writeToJob(inputStream, params);
manager.processData("foo", inputStream, xContentType, params);
verify(communicator).writeToJob(inputStream, xContentType, params);
}
public void testFlush() throws IOException {
@ -251,7 +258,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
InputStream inputStream = createInputStream("");
manager.openJob("foo", 1L, false, e -> {});
manager.processData("foo", inputStream, mock(DataLoadParams.class));
manager.processData("foo", inputStream, randomFrom(XContentType.values()),
mock(DataLoadParams.class));
InterimResultsParams params = InterimResultsParams.builder().build();
manager.flushJob("foo", params);
@ -287,7 +295,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
assertFalse(manager.jobHasActiveAutodetectProcess("foo"));
manager.openJob("foo", 1L, false, e -> {});
manager.processData("foo", createInputStream(""), mock(DataLoadParams.class));
manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class));
assertTrue(manager.jobHasActiveAutodetectProcess("foo"));
assertFalse(manager.jobHasActiveAutodetectProcess("bar"));
@ -295,12 +304,13 @@ public class AutodetectProcessManagerTests extends ESTestCase {
public void testProcessData_GivenStateNotOpened() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
when(communicator.writeToJob(any(), any())).thenReturn(new DataCounts("foo"));
when(communicator.writeToJob(any(), any(), any())).thenReturn(new DataCounts("foo"));
AutodetectProcessManager manager = createManager(communicator);
InputStream inputStream = createInputStream("");
manager.openJob("foo", 1L, false, e -> {});
DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class));
DataCounts dataCounts = manager.processData("foo", inputStream,
randomFrom(XContentType.values()), mock(DataLoadParams.class));
assertThat(dataCounts, equalTo(new DataCounts("foo")));
}
@ -357,7 +367,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private AutodetectProcessManager createManagerAndCallProcessData(AutodetectCommunicator communicator, String jobId) {
AutodetectProcessManager manager = createManager(communicator);
manager.openJob(jobId, 1L, false, e -> {});
manager.processData(jobId, createInputStream(""), mock(DataLoadParams.class));
manager.processData(jobId, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class));
return manager;
}

View File

@ -8,12 +8,12 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DataDescription.DataFormat;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.junit.Before;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@ -79,7 +79,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream);
writer.write(inputStream, null);
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -101,7 +101,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream);
writer.write(inputStream, null);
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -125,7 +125,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
when(dataCountsReporter.getLatestRecordTime()).thenReturn(new Date(5000L));
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream);
writer.write(inputStream, null);
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -155,7 +155,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream);
writer.write(inputStream, null);
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -188,7 +188,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream);
writer.write(inputStream, null);
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -220,7 +220,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
DataCounts counts = writer.write(inputStream);
DataCounts counts = writer.write(inputStream, null);
assertEquals(0L, counts.getInputBytes());
assertEquals(0L, counts.getInputRecordCount());
}
@ -238,7 +238,8 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
SuperCsvException e = ESTestCase.expectThrows(SuperCsvException.class, () -> writer.write(inputStream));
SuperCsvException e = ESTestCase.expectThrows(SuperCsvException.class,
() -> writer.write(inputStream, null));
// Expected line numbers are 2 and 10001, but SuperCSV may print the
// numbers using a different locale's digit characters
assertTrue(e.getMessage(), e.getMessage().matches(

View File

@ -20,7 +20,7 @@ import static org.mockito.Mockito.mock;
public class DataToProcessWriterFactoryTests extends ESTestCase {
public void testCreate_GivenDataFormatIsJson() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataFormat.JSON);
dataDescription.setFormat(DataFormat.XCONTENT);
assertTrue(createWriter(dataDescription.build()) instanceof JsonDataToProcessWriter);
}

View File

@ -8,13 +8,14 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DataDescription.DataFormat;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.junit.Before;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@ -64,7 +65,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataFormat.JSON);
dataDescription.setFormat(DataFormat.XCONTENT);
dataDescription.setTimeFormat(DataDescription.EPOCH);
Detector detector = new Detector.Builder("metric", "value").build();
@ -78,7 +79,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream);
writer.write(inputStream, XContentType.JSON);
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -100,7 +101,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream);
writer.write(inputStream, XContentType.JSON);
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -129,7 +130,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream);
writer.write(inputStream, XContentType.JSON);
List<String[]> expectedRecords = new ArrayList<>();
// The final field is the control field
@ -158,7 +159,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream);
writer.write(inputStream, XContentType.JSON);
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -187,7 +188,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream);
writer.write(inputStream, XContentType.JSON);
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -214,7 +215,8 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
ESTestCase.expectThrows(ElasticsearchParseException.class, () -> writer.write(inputStream));
ESTestCase.expectThrows(ElasticsearchParseException.class,
() -> writer.write(inputStream, XContentType.JSON));
}
public void testWrite_GivenJsonWithArrayField()
@ -229,7 +231,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream);
writer.write(inputStream, XContentType.JSON);
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -258,7 +260,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream);
writer.write(inputStream, XContentType.JSON);
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();

View File

@ -28,13 +28,14 @@ import java.util.Map;
import static org.mockito.Mockito.mock;
public class SimpleJsonRecordReaderTests extends ESTestCase {
public class XContentRecordReaderTests extends ESTestCase {
public void testRead() throws JsonParseException, IOException {
String data = "{\"a\":10, \"b\":20, \"c\":30}\n{\"b\":21, \"a\":11, \"c\":31}\n";
XContentParser parser = createParser(data);
Map<String, Integer> fieldMap = createFieldMap();
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
XContentRecordReader reader = new XContentRecordReader(parser, fieldMap,
mock(Logger.class));
String record[] = new String[3];
boolean gotFields[] = new boolean[3];
@ -61,7 +62,8 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
fieldMap.put("b", 1);
fieldMap.put("c.e", 2);
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
XContentRecordReader reader = new XContentRecordReader(parser, fieldMap,
mock(Logger.class));
String record[] = new String[3];
boolean gotFields[] = new boolean[3];
@ -83,7 +85,8 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
fieldMap.put("b", 1);
fieldMap.put("c.e", 2);
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
XContentRecordReader reader = new XContentRecordReader(parser, fieldMap,
mock(Logger.class));
String record[] = new String[3];
boolean gotFields[] = new boolean[3];
@ -98,14 +101,16 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
public void testRead_GivenMultiValueArrays() throws JsonParseException, IOException {
String data = "{\"a\":[10, 11], \"b\":20, \"c\":{\"d\":30, \"e\":[40, 50]}, \"f\":[\"a\", \"a\", \"a\", \"a\"], \"g\":20}";
String data = "{\"a\":[10, 11], \"b\":20, \"c\":{\"d\":30, \"e\":[40, 50]}, "
+ "\"f\":[\"a\", \"a\", \"a\", \"a\"], \"g\":20}";
XContentParser parser = createParser(data);
Map<String, Integer> fieldMap = new HashMap<>();
fieldMap.put("a", 0);
fieldMap.put("g", 1);
fieldMap.put("c.e", 2);
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
XContentRecordReader reader = new XContentRecordReader(parser, fieldMap,
mock(Logger.class));
String record[] = new String[3];
boolean gotFields[] = new boolean[3];
@ -119,18 +124,20 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
}
/**
* There's a problem with the parser where in this case it skips over the first 2 records
* instead of to the end of the first record which is invalid json.
* This means we miss the next record after a bad one.
* There's a problem with the parser where in this case it skips over the
* first 2 records instead of to the end of the first record which is
* invalid json. This means we miss the next record after a bad one.
*/
public void testRead_RecoverFromBadJson() throws JsonParseException, IOException {
// no opening '{'
String data = "\"a\":10, \"b\":20, \"c\":30}\n{\"b\":21, \"a\":11, \"c\":31}\n{\"c\":32, \"b\":22, \"a\":12}";
String data = "\"a\":10, \"b\":20, \"c\":30}\n{\"b\":21, \"a\":11, \"c\":31}\n"
+ "{\"c\":32, \"b\":22, \"a\":12}";
XContentParser parser = createParser(data);
Map<String, Integer> fieldMap = createFieldMap();
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
XContentRecordReader reader = new XContentRecordReader(parser, fieldMap,
mock(Logger.class));
String record[] = new String[3];
boolean gotFields[] = new boolean[3];
@ -147,12 +154,13 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
public void testRead_RecoverFromBadNestedJson() throws JsonParseException, IOException {
// nested object 'd' is missing a ','
String data = "{\"a\":10, \"b\":20, \"c\":30}\n" +
"{\"b\":21, \"d\" : {\"ee\": 1 \"ff\":0}, \"a\":11, \"c\":31}";
String data = "{\"a\":10, \"b\":20, \"c\":30}\n"
+ "{\"b\":21, \"d\" : {\"ee\": 1 \"ff\":0}, \"a\":11, \"c\":31}";
XContentParser parser = createParser(data);
Map<String, Integer> fieldMap = createFieldMap();
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
XContentRecordReader reader = new XContentRecordReader(parser, fieldMap,
mock(Logger.class));
String record[] = new String[3];
boolean gotFields[] = new boolean[3];
@ -173,23 +181,24 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
// missing a ':'
String format = "{\"a\":1%1$d, \"b\"2%1$d, \"c\":3%1$d}\n";
StringBuilder builder = new StringBuilder();
for (int i = 0; i < SimpleJsonRecordReader.PARSE_ERRORS_LIMIT; i++) {
for (int i = 0; i < XContentRecordReader.PARSE_ERRORS_LIMIT; i++) {
builder.append(String.format(Locale.ROOT, format, i));
}
XContentParser parser = createParser(builder.toString());
Map<String, Integer> fieldMap = createFieldMap();
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
XContentRecordReader reader = new XContentRecordReader(parser, fieldMap,
mock(Logger.class));
ESTestCase.expectThrows(ElasticsearchParseException.class, () -> readUntilError(reader));
}
private void readUntilError(SimpleJsonRecordReader reader) throws IOException {
private void readUntilError(XContentRecordReader reader) throws IOException {
String record[] = new String[3];
boolean gotFields[] = new boolean[3];
// this should throw after PARSE_ERRORS_LIMIT errors
for (int i = 0; i < SimpleJsonRecordReader.PARSE_ERRORS_LIMIT; i++) {
for (int i = 0; i < XContentRecordReader.PARSE_ERRORS_LIMIT; i++) {
reader.read(record, gotFields);
}
}
@ -198,13 +207,13 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
char controlChar = '\u0002';
String data = "{\"a\":10, \"" + controlChar + "\" : 5, \"b\":20, \"c\":30}"
+ "\n{\"b\":21, \"a\":11, \"c\":31}"
+ "\n{\"c\":32, \"b\":22, \"a\":12}\n";
+ "\n{\"b\":21, \"a\":11, \"c\":31}" + "\n{\"c\":32, \"b\":22, \"a\":12}\n";
XContentParser parser = createParser(data);
Map<String, Integer> fieldMap = createFieldMap();
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
XContentRecordReader reader = new XContentRecordReader(parser, fieldMap,
mock(Logger.class));
String record[] = new String[3];
boolean gotFields[] = new boolean[3];
@ -215,8 +224,10 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
}
private XContentParser createParser(String input) throws JsonParseException, IOException {
ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8));
InputStream inputStream2 = new CountingInputStream(inputStream, mock(DataCountsReporter.class));
ByteArrayInputStream inputStream = new ByteArrayInputStream(
input.getBytes(StandardCharsets.UTF_8));
InputStream inputStream2 = new CountingInputStream(inputStream,
mock(DataCountsReporter.class));
return XContentFactory.xContent(XContentType.JSON)
.createParser(new NamedXContentRegistry(Collections.emptyList()), inputStream2);
}

View File

@ -116,7 +116,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
protected Job.Builder createJob(String id) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.JSON);
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
Detector.Builder d = new Detector.Builder("count", null);
@ -133,7 +133,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
public static Job.Builder createFareQuoteJob(String id) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.JSON);
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
dataDescription.setTimeFormat(DataDescription.EPOCH);
dataDescription.setTimeField("time");
@ -153,7 +153,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
public static Job.Builder createScheduledJob(String jobId) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.JSON);
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
dataDescription.setTimeFormat("yyyy-MM-dd HH:mm:ss");
Detector.Builder d = new Detector.Builder("count", null);

View File

@ -24,7 +24,8 @@
},
"body": {
"description" : "The data to process",
"required" : true
"required" : true,
"serialize" : "bulk"
}
}
}

View File

@ -10,7 +10,7 @@ setup:
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"JSON",
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}

View File

@ -10,7 +10,7 @@ setup:
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"JSON",
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}

View File

@ -10,7 +10,7 @@ setup:
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"JSON",
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}

View File

@ -17,7 +17,7 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"JSON",
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}
@ -37,7 +37,7 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"JSON",
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}
@ -288,7 +288,7 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"JSON",
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}

View File

@ -273,7 +273,7 @@
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"JSON",
"format":"xcontent",
"time_field":"time",
"time_format":"yyyy-MM-dd HH:mm:ssX"
}

View File

@ -11,7 +11,7 @@ setup:
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"JSON",
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}
@ -29,7 +29,7 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format" : "JSON",
"format" : "xcontent",
"time_field":"time",
"time_format":"yyyy-MM-dd'T'HH:mm:ssX"
}

View File

@ -8,7 +8,7 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"JSON",
"format":"xcontent",
"time_field":"time"
}
}

View File

@ -8,7 +8,7 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"JSON",
"format":"xcontent",
"time_field":"time"
}
}

View File

@ -11,7 +11,7 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"JSON",
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}
@ -33,7 +33,7 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format" : "JSON",
"format" : "xcontent",
"time_field":"time",
"time_format":"yyyy-MM-dd'T'HH:mm:ssX"
}

View File

@ -11,7 +11,7 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"JSON",
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}
@ -28,7 +28,7 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"JSON"
"format":"xcontent"
}
}
@ -38,22 +38,24 @@ setup:
---
"Test POST data job api, flush, close and verify DataCounts doc":
- skip:
features: ["headers"]
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: farequote
body: >
{"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"}
{"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"}
body:
- airline: AAL
responsetime: 132.2046
sourcetype: farequote
time: 1403481600
- airline: JZA
responsetime: 990.4628
sourcetype: farequote
time: 1403481700
- match: { processed_record_count: 2 }
- match: { processed_field_count: 4}
- match: { input_bytes: 178 }
- gte: { input_bytes: 170 }
- lte: { input_bytes: 180 }
- match: { input_field_count: 6 }
- match: { invalid_date_count: 0 }
- match: { missing_field_count: 0 }
@ -84,7 +86,8 @@ setup:
- match: { _source.processed_record_count: 2 }
- match: { _source.processed_field_count: 4}
- match: { _source.input_bytes: 178 }
- gte: { _source.input_bytes: 170 }
- lte: { _source.input_bytes: 180 }
- match: { _source.input_field_count: 6 }
- match: { _source.invalid_date_count: 0 }
- match: { _source.missing_field_count: 0 }
@ -111,41 +114,50 @@ setup:
---
"Test POST data with invalid parameters":
- skip:
features: ["headers"]
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
catch: missing
xpack.ml.post_data:
job_id: not_a_job
body: {}
body:
- airline: AAL
responsetime: 132.2046
sourcetype: farequote
time: 1403481600
- airline: JZA
responsetime: 990.4628
sourcetype: farequote
time: 1403481700
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
catch: /parse_exception/
xpack.ml.post_data:
job_id: farequote
reset_start: not_a_date
body: >
{"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"}
{"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481600"}
body:
- airline: AAL
responsetime: 132.2046
sourcetype: farequote
time: 1403481600
- airline: JZA
responsetime: 990.4628
sourcetype: farequote
time: 1403481700
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
catch: /parse_exception/
xpack.ml.post_data:
job_id: farequote
reset_end: end_not_a_date
body: >
{"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"}
{"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481600"}
body:
- airline: AAL
responsetime: 132.2046
sourcetype: farequote
time: 1403481600
- airline: JZA
responsetime: 990.4628
sourcetype: farequote
time: 1403481700
---
"Test Flush data with invalid parameters":
@ -186,30 +198,27 @@ setup:
---
"Test flushing, posting and closing a closed job":
- skip:
features: ["headers"]
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
catch: /status_exception/
xpack.ml.flush_job:
job_id: closed_job
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
catch: /status_exception/
xpack.ml.close_job:
job_id: closed_job
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
catch: /status_exception/
xpack.ml.post_data:
job_id: closed_job
body: {}
body:
- airline: AAL
responsetime: 132.2046
sourcetype: farequote
time: 1403481600
- airline: JZA
responsetime: 990.4628
sourcetype: farequote
time: 1403481700

View File

@ -21,7 +21,7 @@ setup:
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"JSON",
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}