From 43f1fb2bb141f54a98ed5dcca9b27d5a0ea5a2d8 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Mon, 10 Apr 2017 17:55:15 -0400 Subject: [PATCH] [ML] Correctly parse stream of SMILE documents in JsonDataToProcessWriter (elastic/x-pack-elasticsearch#982) The PR detects if SMILE is being provided, then correctly slices the stream such that each document is parsed individually. This is required because jackson's SMILE parser is stricter than it's JSON parser and will stop parsing when it hits a streamSeparator (unlike JSON, which will eagerly try to find more objects to parse). Removes the forced-headers from the various REST tests. relates elastic/x-pack-elasticsearch#642 Original commit: elastic/x-pack-elasticsearch@c0e97cd54501eed354c960024ea961eccc295950 --- .../ml/job/process/CountingInputStream.java | 21 +++++- .../writer/JsonDataToProcessWriter.java | 66 ++++++++++++++++--- .../job/process/CountingInputStreamTests.java | 20 ++++++ .../writer/JsonDataToProcessWriterTests.java | 41 ++++++++++++ .../rest-api-spec/test/ml/index_layout.yaml | 14 +--- .../rest-api-spec/test/ml/jobs_crud.yaml | 6 -- .../rest-api-spec/test/ml/jobs_get_stats.yaml | 10 --- .../rest-api-spec/test/ml/post_data.yaml | 10 +-- 8 files changed, 141 insertions(+), 47 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/CountingInputStream.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/CountingInputStream.java index 559347d1fa3..43829e2a8af 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/CountingInputStream.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/CountingInputStream.java @@ -18,6 +18,8 @@ import java.io.InputStream; */ public class CountingInputStream extends FilterInputStream { private DataCountsReporter dataCountsReporter; + private int markPosition = 0; + private int currentPosition = 0; /** * @param in @@ -37,6 +39,7 @@ public class CountingInputStream extends FilterInputStream { public int read() throws IOException { int read = in.read(); dataCountsReporter.reportBytesRead(read < 0 ? 0 : 1); + currentPosition += read < 0 ? 0 : 1; return read; } @@ -44,8 +47,8 @@ public class CountingInputStream extends FilterInputStream { @Override public int read(byte[] b) throws IOException { int read = in.read(b); - dataCountsReporter.reportBytesRead(read < 0 ? 0 : read); + currentPosition += read < 0 ? 0 : read; return read; } @@ -53,8 +56,22 @@ public class CountingInputStream extends FilterInputStream { @Override public int read(byte[] b, int off, int len) throws IOException { int read = in.read(b, off, len); - dataCountsReporter.reportBytesRead(read < 0 ? 0 : read); + currentPosition += read < 0 ? 0 : read; + return read; } + + @Override + public synchronized void mark(int readlimit) { + super.mark(readlimit); + markPosition = currentPosition; + } + + @Override + public synchronized void reset() throws IOException { + super.reset(); + dataCountsReporter.reportBytesRead(-(currentPosition - markPosition)); + currentPosition = markPosition; + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java index 0e2b157266c..47d60bf4ccb 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriter.java @@ -58,19 +58,69 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter { throws IOException { dataCountsReporter.startNewIncrementalCount(); - try (XContentParser parser = XContentFactory.xContent(xContentType) + if (xContentType.equals(XContentType.JSON)) { + writeJsonXContent(inputStream); + } else if (xContentType.equals(XContentType.SMILE)) { + writeSmileXContent(inputStream); + } else { + throw new RuntimeException("XContentType [" + xContentType + + "] is not supported by JsonDataToProcessWriter"); + } + + // this line can throw and will be propagated + dataCountsReporter.finishReporting( + ActionListener.wrap( + response -> handler.accept(dataCountsReporter.incrementalStats(), null), + e -> handler.accept(null, e) + )); + } + + private void writeJsonXContent(InputStream inputStream) throws IOException { + try (XContentParser parser = XContentFactory.xContent(XContentType.JSON) .createParser(xContentRegistry, inputStream)) { writeJson(parser); - - // this line can throw and will be propagated - dataCountsReporter.finishReporting( - ActionListener.wrap( - response -> handler.accept(dataCountsReporter.incrementalStats(), null), - e -> handler.accept(null, e) - )); } } + private void writeSmileXContent(InputStream inputStream) throws IOException { + while (true) { + byte[] nextObject = findNextObject(XContentType.SMILE.xContent().streamSeparator(), inputStream); + if (nextObject.length == 0) { + break; + } + try (XContentParser parser = XContentFactory.xContent(XContentType.SMILE) + .createParser(xContentRegistry, nextObject)) { + writeJson(parser); + } + } + } + + private byte[] findNextObject(byte marker, InputStream data) throws IOException { + // The underlying stream, MarkSupportingStreamInputWrapper, doesn't care about + // readlimit, so just set to -1. We could pick a value, but I worry that if the + // underlying implementation changes it may cause strange behavior, whereas -1 should + // blow up immediately + assert(data.markSupported()); + data.mark(-1); + + int nextByte; + int counter = 0; + do { + nextByte = data.read(); + counter += 1; + + // marker & 0xFF to deal with Java's lack of unsigned bytes... + if (nextByte == (marker & 0xFF)) { + data.reset(); + byte[] buffer = new byte[counter]; + data.read(buffer); + return buffer; + } + } while (nextByte != -1); + + return new byte[0]; + } + private void writeJson(XContentParser parser) throws IOException { Collection analysisFields = inputFields(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/CountingInputStreamTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/CountingInputStreamTests.java index ce009cc2f7a..45a5e57af5f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/CountingInputStreamTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/CountingInputStreamTests.java @@ -55,4 +55,24 @@ public class CountingInputStreamTests extends ESTestCase { } } + public void testRead_WithResets() throws IOException { + + DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); + + final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail."; + InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8)); + + try (CountingInputStream counting = new CountingInputStream(source, dataCountsReporter)) { + while (counting.read() >= 0) { + if (randomInt(10) > 5) { + counting.mark(-1); + } + if (randomInt(10) > 7) { + counting.reset(); + } + } + assertEquals(TEXT.length(), dataCountsReporter.incrementalStats().getInputBytes()); + } + } + } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java index 02ede97ae24..dda2358320a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java @@ -6,8 +6,12 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentGenerator; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; @@ -281,6 +285,43 @@ public class JsonDataToProcessWriterTests extends ESTestCase { verify(dataCountsReporter).finishReporting(any()); } + public void testWrite_Smile() throws Exception { + + BytesStreamOutput xsonOs = new BytesStreamOutput(); + XContentGenerator xsonGen = XContentFactory.xContent(XContentType.SMILE).createGenerator(xsonOs); + xsonGen.writeStartObject(); + xsonGen.writeStringField("time", "1"); + xsonGen.writeStringField("metric", "foo"); + xsonGen.writeStringField("value", "1.0"); + xsonGen.writeEndObject(); + xsonGen.close(); + xsonOs.writeByte(XContentType.SMILE.xContent().streamSeparator()); + + xsonGen = XContentFactory.xContent(XContentType.SMILE).createGenerator(xsonOs); + xsonGen.writeStartObject(); + xsonGen.writeStringField("time", "2"); + xsonGen.writeStringField("metric", "bar"); + xsonGen.writeStringField("value", "2.0"); + xsonGen.writeEndObject(); + xsonGen.flush(); + xsonOs.writeByte(XContentType.SMILE.xContent().streamSeparator()); + + InputStream inputStream = new ByteArrayInputStream(BytesReference.toBytes(xsonOs.bytes())); + JsonDataToProcessWriter writer = createWriter(); + writer.writeHeader(); + writer.write(inputStream, XContentType.SMILE, (r, e) -> {}); + verify(dataCountsReporter, times(1)).startNewIncrementalCount(); + + List expectedRecords = new ArrayList<>(); + // The final field is the control field + expectedRecords.add(new String[]{"time", "value", "."}); + expectedRecords.add(new String[]{"1", "1.0", ""}); + expectedRecords.add(new String[]{"2", "2.0", ""}); + assertWrittenRecordsEqualTo(expectedRecords); + + verify(dataCountsReporter).finishReporting(any()); + } + private static InputStream createInputStream(String input) { return new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yaml index bf5a269b5bd..e58244a5978 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yaml @@ -1,7 +1,3 @@ -setup: - - skip: - features: ["headers"] - --- "Test CRUD on two jobs in shared index": @@ -53,9 +49,6 @@ setup: job_id: farequote2 - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json xpack.ml.post_data: job_id: farequote body: > @@ -63,13 +56,11 @@ setup: {"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"} - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json xpack.ml.post_data: job_id: farequote2 body: > {"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"} + {"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"} - do: @@ -300,9 +291,6 @@ setup: job_id: farequote - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json xpack.ml.post_data: job_id: farequote body: > diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml index 62972ad604f..a857a516fd2 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml @@ -323,9 +323,6 @@ job_id: farequote - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json xpack.ml.post_data: job_id: farequote body: > @@ -383,9 +380,6 @@ job_id: farequote - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json xpack.ml.post_data: job_id: farequote body: > diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml index dab9e42ceeb..d4df12f7362 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yaml @@ -54,13 +54,8 @@ setup: --- "Test get job stats after uploading data prompting the creation of some stats": - - skip: - features: ["headers"] - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json xpack.ml.post_data: job_id: job-stats-test body: > @@ -89,13 +84,8 @@ setup: --- "Test get job stats for closed job": - - skip: - features: ["headers"] - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json xpack.ml.post_data: job_id: job-stats-test body: > diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/post_data.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/post_data.yaml index df59c9cd58d..f6b788d266a 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/post_data.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/post_data.yaml @@ -38,13 +38,7 @@ setup: --- "Test POST data job api, flush, close and verify DataCounts doc": - - skip: - features: ["headers"] - - do: - #set the header so we won't randomize it - headers: - Content-Type: application/json xpack.ml.post_data: job_id: farequote body: @@ -59,7 +53,7 @@ setup: - match: { processed_record_count: 2 } - match: { processed_field_count: 4} - - gte: { input_bytes: 170 } + - gte: { input_bytes: 140 } - lte: { input_bytes: 180 } - match: { input_field_count: 6 } - match: { invalid_date_count: 0 } @@ -91,7 +85,7 @@ setup: - match: { _source.processed_record_count: 2 } - match: { _source.processed_field_count: 4} - - gte: { _source.input_bytes: 170 } + - gte: { _source.input_bytes: 140 } - lte: { _source.input_bytes: 180 } - match: { _source.input_field_count: 6 } - match: { _source.invalid_date_count: 0 }