[ML] Correctly parse stream of SMILE documents in JsonDataToProcessWriter (elastic/x-pack-elasticsearch#982)

The PR detects if SMILE is being provided, then correctly slices the stream such that each document is parsed individually. This is required because jackson's SMILE parser is stricter than it's JSON parser and will stop parsing when it hits a streamSeparator (unlike JSON, which will eagerly try to find more objects to parse).

Removes the forced-headers from the various REST tests.

relates elastic/x-pack-elasticsearch#642 

Original commit: elastic/x-pack-elasticsearch@c0e97cd545
This commit is contained in:
Zachary Tong 2017-04-10 17:55:15 -04:00 committed by GitHub
parent 49223a8782
commit 43f1fb2bb1
8 changed files with 141 additions and 47 deletions

View File

@ -18,6 +18,8 @@ import java.io.InputStream;
*/
public class CountingInputStream extends FilterInputStream {
private DataCountsReporter dataCountsReporter;
private int markPosition = 0;
private int currentPosition = 0;
/**
* @param in
@ -37,6 +39,7 @@ public class CountingInputStream extends FilterInputStream {
public int read() throws IOException {
int read = in.read();
dataCountsReporter.reportBytesRead(read < 0 ? 0 : 1);
currentPosition += read < 0 ? 0 : 1;
return read;
}
@ -44,8 +47,8 @@ public class CountingInputStream extends FilterInputStream {
@Override
public int read(byte[] b) throws IOException {
int read = in.read(b);
dataCountsReporter.reportBytesRead(read < 0 ? 0 : read);
currentPosition += read < 0 ? 0 : read;
return read;
}
@ -53,8 +56,22 @@ public class CountingInputStream extends FilterInputStream {
@Override
public int read(byte[] b, int off, int len) throws IOException {
int read = in.read(b, off, len);
dataCountsReporter.reportBytesRead(read < 0 ? 0 : read);
currentPosition += read < 0 ? 0 : read;
return read;
}
@Override
public synchronized void mark(int readlimit) {
super.mark(readlimit);
markPosition = currentPosition;
}
@Override
public synchronized void reset() throws IOException {
super.reset();
dataCountsReporter.reportBytesRead(-(currentPosition - markPosition));
currentPosition = markPosition;
}
}

View File

@ -58,19 +58,69 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
throws IOException {
dataCountsReporter.startNewIncrementalCount();
try (XContentParser parser = XContentFactory.xContent(xContentType)
if (xContentType.equals(XContentType.JSON)) {
writeJsonXContent(inputStream);
} else if (xContentType.equals(XContentType.SMILE)) {
writeSmileXContent(inputStream);
} else {
throw new RuntimeException("XContentType [" + xContentType
+ "] is not supported by JsonDataToProcessWriter");
}
// this line can throw and will be propagated
dataCountsReporter.finishReporting(
ActionListener.wrap(
response -> handler.accept(dataCountsReporter.incrementalStats(), null),
e -> handler.accept(null, e)
));
}
private void writeJsonXContent(InputStream inputStream) throws IOException {
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, inputStream)) {
writeJson(parser);
// this line can throw and will be propagated
dataCountsReporter.finishReporting(
ActionListener.wrap(
response -> handler.accept(dataCountsReporter.incrementalStats(), null),
e -> handler.accept(null, e)
));
}
}
private void writeSmileXContent(InputStream inputStream) throws IOException {
while (true) {
byte[] nextObject = findNextObject(XContentType.SMILE.xContent().streamSeparator(), inputStream);
if (nextObject.length == 0) {
break;
}
try (XContentParser parser = XContentFactory.xContent(XContentType.SMILE)
.createParser(xContentRegistry, nextObject)) {
writeJson(parser);
}
}
}
private byte[] findNextObject(byte marker, InputStream data) throws IOException {
// The underlying stream, MarkSupportingStreamInputWrapper, doesn't care about
// readlimit, so just set to -1. We could pick a value, but I worry that if the
// underlying implementation changes it may cause strange behavior, whereas -1 should
// blow up immediately
assert(data.markSupported());
data.mark(-1);
int nextByte;
int counter = 0;
do {
nextByte = data.read();
counter += 1;
// marker & 0xFF to deal with Java's lack of unsigned bytes...
if (nextByte == (marker & 0xFF)) {
data.reset();
byte[] buffer = new byte[counter];
data.read(buffer);
return buffer;
}
} while (nextByte != -1);
return new byte[0];
}
private void writeJson(XContentParser parser) throws IOException {
Collection<String> analysisFields = inputFields();

View File

@ -55,4 +55,24 @@ public class CountingInputStreamTests extends ESTestCase {
}
}
public void testRead_WithResets() throws IOException {
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
try (CountingInputStream counting = new CountingInputStream(source, dataCountsReporter)) {
while (counting.read() >= 0) {
if (randomInt(10) > 5) {
counting.mark(-1);
}
if (randomInt(10) > 7) {
counting.reset();
}
}
assertEquals(TEXT.length(), dataCountsReporter.incrementalStats().getInputBytes());
}
}
}

View File

@ -6,8 +6,12 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentGenerator;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
@ -281,6 +285,43 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
verify(dataCountsReporter).finishReporting(any());
}
public void testWrite_Smile() throws Exception {
BytesStreamOutput xsonOs = new BytesStreamOutput();
XContentGenerator xsonGen = XContentFactory.xContent(XContentType.SMILE).createGenerator(xsonOs);
xsonGen.writeStartObject();
xsonGen.writeStringField("time", "1");
xsonGen.writeStringField("metric", "foo");
xsonGen.writeStringField("value", "1.0");
xsonGen.writeEndObject();
xsonGen.close();
xsonOs.writeByte(XContentType.SMILE.xContent().streamSeparator());
xsonGen = XContentFactory.xContent(XContentType.SMILE).createGenerator(xsonOs);
xsonGen.writeStartObject();
xsonGen.writeStringField("time", "2");
xsonGen.writeStringField("metric", "bar");
xsonGen.writeStringField("value", "2.0");
xsonGen.writeEndObject();
xsonGen.flush();
xsonOs.writeByte(XContentType.SMILE.xContent().streamSeparator());
InputStream inputStream = new ByteArrayInputStream(BytesReference.toBytes(xsonOs.bytes()));
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, XContentType.SMILE, (r, e) -> {});
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
// The final field is the control field
expectedRecords.add(new String[]{"time", "value", "."});
expectedRecords.add(new String[]{"1", "1.0", ""});
expectedRecords.add(new String[]{"2", "2.0", ""});
assertWrittenRecordsEqualTo(expectedRecords);
verify(dataCountsReporter).finishReporting(any());
}
private static InputStream createInputStream(String input) {
return new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8));
}

View File

@ -1,7 +1,3 @@
setup:
- skip:
features: ["headers"]
---
"Test CRUD on two jobs in shared index":
@ -53,9 +49,6 @@ setup:
job_id: farequote2
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: farequote
body: >
@ -63,13 +56,11 @@ setup:
{"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"}
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: farequote2
body: >
{"airline":"AAL","responsetime":"132.2046","sourcetype":"farequote","time":"1403481600"}
{"airline":"JZA","responsetime":"990.4628","sourcetype":"farequote","time":"1403481700"}
- do:
@ -300,9 +291,6 @@ setup:
job_id: farequote
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: farequote
body: >

View File

@ -323,9 +323,6 @@
job_id: farequote
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: farequote
body: >
@ -383,9 +380,6 @@
job_id: farequote
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: farequote
body: >

View File

@ -54,13 +54,8 @@ setup:
---
"Test get job stats after uploading data prompting the creation of some stats":
- skip:
features: ["headers"]
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: job-stats-test
body: >
@ -89,13 +84,8 @@ setup:
---
"Test get job stats for closed job":
- skip:
features: ["headers"]
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: job-stats-test
body: >

View File

@ -38,13 +38,7 @@ setup:
---
"Test POST data job api, flush, close and verify DataCounts doc":
- skip:
features: ["headers"]
- do:
#set the header so we won't randomize it
headers:
Content-Type: application/json
xpack.ml.post_data:
job_id: farequote
body:
@ -59,7 +53,7 @@ setup:
- match: { processed_record_count: 2 }
- match: { processed_field_count: 4}
- gte: { input_bytes: 170 }
- gte: { input_bytes: 140 }
- lte: { input_bytes: 180 }
- match: { input_field_count: 6 }
- match: { invalid_date_count: 0 }
@ -91,7 +85,7 @@ setup:
- match: { _source.processed_record_count: 2 }
- match: { _source.processed_field_count: 4}
- gte: { _source.input_bytes: 170 }
- gte: { _source.input_bytes: 140 }
- lte: { _source.input_bytes: 180 }
- match: { _source.input_field_count: 6 }
- match: { _source.invalid_date_count: 0 }