[ML] Move consuming and closing results stream to result processor (#36314)

The results iterator is consuming and closing the results stream
once it is done. It seems this should not be the responsibility
of the results iterator. It stops the iterator from being reusable
for different processes where closing the stream is not desirable.
This commit is moving the consuming and closing of the results stream
into the autodetect result processor.
This commit is contained in:
Dimitris Athanasiou 2018-12-07 09:33:51 +00:00 committed by GitHub
parent 266b9bc796
commit 0dd73ef7da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 211 additions and 181 deletions

View File

@ -102,4 +102,12 @@ public interface AutodetectProcess extends NativeProcess {
* @return stream of autodetect results.
*/
Iterator<AutodetectResult> readAutodetectResults();
/**
* Read anything left in the stream before
* closing the stream otherwise if the process
* tries to write more after the close it gets
* a SIGPIPE
*/
void consumeAndCloseOutputStream();
}

View File

@ -149,6 +149,10 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
};
}
@Override
public void consumeAndCloseOutputStream() {
}
@Override
public ZonedDateTime getProcessStartTime() {
return startTime;

View File

@ -117,4 +117,17 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec
private AutodetectControlMsgWriter newMessageWriter() {
return new AutodetectControlMsgWriter(recordWriter(), numberOfFields());
}
@Override
public void consumeAndCloseOutputStream() {
try {
byte[] buff = new byte[512];
while (processOutStream().read(buff) >= 0) {
// Do nothing
}
processOutStream().close();
} catch (IOException e) {
throw new RuntimeException("Error closing result parser input stream", e);
}
}
}

View File

@ -34,8 +34,8 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
@ -135,25 +135,7 @@ public class AutoDetectResultProcessor {
// to kill the results reader thread as autodetect will be blocked
// trying to write its output.
try {
bucketCount = 0;
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
while (iterator.hasNext()) {
try {
AutodetectResult result = iterator.next();
processResult(context, result);
if (result.getBucket() != null) {
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
}
} catch (Exception e) {
if (processKilled) {
throw e;
}
if (process.isProcessAliveAfterWaiting() == false) {
throw e;
}
LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e);
}
}
readResults(process, context);
try {
if (processKilled == false) {
@ -188,6 +170,32 @@ public class AutoDetectResultProcessor {
}
}
private void readResults(AutodetectProcess process, Context context) {
bucketCount = 0;
try {
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
while (iterator.hasNext()) {
try {
AutodetectResult result = iterator.next();
processResult(context, result);
if (result.getBucket() != null) {
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
}
} catch (Exception e) {
if (processKilled) {
throw e;
}
if (process.isProcessAliveAfterWaiting() == false) {
throw e;
}
LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e);
}
}
} finally {
process.consumeAndCloseOutputStream();
}
}
public void setProcessKilled() {
processKilled = true;
renormalizer.shutdown();

View File

@ -38,27 +38,10 @@ public class AutodetectResultsParser {
}
return new AutodetectResultIterator(in, parser);
} catch (IOException e) {
consumeAndCloseStream(in);
throw new ElasticsearchParseException(e.getMessage(), e);
}
}
static void consumeAndCloseStream(InputStream in) {
try {
// read anything left in the stream before
// closing the stream otherwise if the process
// tries to write more after the close it gets
// a SIGPIPE
byte[] buff = new byte[512];
while (in.read(buff) >= 0) {
// Do nothing
}
in.close();
} catch (IOException e) {
throw new RuntimeException("Error closing result parser input stream", e);
}
}
private static class AutodetectResultIterator implements Iterator<AutodetectResult> {
private static final Logger logger = LogManager.getLogger(AutodetectResultIterator.class);
@ -79,15 +62,12 @@ public class AutodetectResultsParser {
token = parser.nextToken();
} catch (IOException e) {
logger.debug("io error while parsing", e);
consumeAndCloseStream(in);
return false;
}
if (token == XContentParser.Token.END_ARRAY) {
consumeAndCloseStream(in);
return false;
} else if (token != XContentParser.Token.START_OBJECT) {
logger.error("Expecting Json Field name token after the Start Object token");
consumeAndCloseStream(in);
throw new ElasticsearchParseException("unexpected token [" + token + "]");
}
return true;

View File

@ -16,6 +16,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectContro
import org.junit.Assert;
import org.junit.Before;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@ -29,6 +30,7 @@ import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
@ -51,8 +53,10 @@ public class NativeAutodetectProcessTests extends ESTestCase {
public void testProcessStartTime() throws Exception {
InputStream logStream = mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1);
InputStream outputStream = mock(InputStream.class);
when(outputStream.read(new byte[512])).thenReturn(-1);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
mock(OutputStream.class), mock(InputStream.class), mock(OutputStream.class),
mock(OutputStream.class), outputStream, mock(OutputStream.class),
NUMBER_FIELDS, null,
new AutodetectResultsParser(), mock(Runnable.class))) {
process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
@ -70,10 +74,12 @@ public class NativeAutodetectProcessTests extends ESTestCase {
public void testWriteRecord() throws IOException {
InputStream logStream = mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1);
InputStream outputStream = mock(InputStream.class);
when(outputStream.read(new byte[512])).thenReturn(-1);
String[] record = {"r1", "r2", "r3", "r4", "r5"};
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(), mock(Runnable.class))) {
process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
@ -103,9 +109,11 @@ public class NativeAutodetectProcessTests extends ESTestCase {
public void testFlush() throws IOException {
InputStream logStream = mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1);
InputStream outputStream = mock(InputStream.class);
when(outputStream.read(new byte[512])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(), mock(Runnable.class))) {
process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
@ -130,12 +138,30 @@ public class NativeAutodetectProcessTests extends ESTestCase {
testWriteMessage(p -> p.persistState(), AutodetectControlMsgWriter.BACKGROUND_PERSIST_MESSAGE_CODE);
}
public void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunction, String expectedMessageCode) throws IOException {
public void testConsumeAndCloseOutputStream() throws IOException {
InputStream logStream = mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1);
OutputStream processInStream = mock(OutputStream.class);
String json = "some string of data";
ByteArrayInputStream processOutStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(), mock(Runnable.class))) {
process.consumeAndCloseOutputStream();
assertThat(processOutStream.available(), equalTo(0));
}
}
private void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunction, String expectedMessageCode) throws IOException {
InputStream logStream = mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1);
InputStream outputStream = mock(InputStream.class);
when(outputStream.read(new byte[512])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new AutodetectResultsParser(), mock(Runnable.class))) {
process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));

View File

@ -29,8 +29,8 @@ import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;

View File

@ -22,12 +22,6 @@ import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests for parsing the JSON output of autodetect
*/
@ -236,169 +230,179 @@ public class AutodetectResultsParserTests extends ESTestCase {
+ "\"event_count\":1159}" + "]";
public void testParser() throws IOException {
InputStream inputStream = new ByteArrayInputStream(METRIC_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser();
List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).forEachRemaining(results::add);
List<Bucket> buckets = results.stream().map(AutodetectResult::getBucket)
try (InputStream inputStream = new ByteArrayInputStream(METRIC_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8))) {
AutodetectResultsParser parser = new AutodetectResultsParser();
List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).forEachRemaining(results::add);
List<Bucket> buckets = results.stream().map(AutodetectResult::getBucket)
.filter(b -> b != null)
.collect(Collectors.toList());
assertEquals(2, buckets.size());
assertEquals(new Date(1359450000000L), buckets.get(0).getTimestamp());
assertEquals(2, buckets.size());
assertEquals(new Date(1359450000000L), buckets.get(0).getTimestamp());
assertEquals(buckets.get(0).getEventCount(), 806);
assertEquals(buckets.get(0).getEventCount(), 806);
List<BucketInfluencer> bucketInfluencers = buckets.get(0).getBucketInfluencers();
assertEquals(1, bucketInfluencers.size());
assertEquals(0.0, bucketInfluencers.get(0).getRawAnomalyScore(), EPSILON);
assertEquals(0.0, bucketInfluencers.get(0).getAnomalyScore(), EPSILON);
assertEquals(0.0, bucketInfluencers.get(0).getProbability(), EPSILON);
assertEquals("bucket_time", bucketInfluencers.get(0).getInfluencerFieldName());
List<BucketInfluencer> bucketInfluencers = buckets.get(0).getBucketInfluencers();
assertEquals(1, bucketInfluencers.size());
assertEquals(0.0, bucketInfluencers.get(0).getRawAnomalyScore(), EPSILON);
assertEquals(0.0, bucketInfluencers.get(0).getAnomalyScore(), EPSILON);
assertEquals(0.0, bucketInfluencers.get(0).getProbability(), EPSILON);
assertEquals("bucket_time", bucketInfluencers.get(0).getInfluencerFieldName());
assertEquals(new Date(1359453600000L), buckets.get(1).getTimestamp());
assertEquals(new Date(1359453600000L), buckets.get(1).getTimestamp());
assertEquals(buckets.get(1).getEventCount(), 820);
bucketInfluencers = buckets.get(1).getBucketInfluencers();
assertEquals(2, bucketInfluencers.size());
assertEquals(0.0140005, bucketInfluencers.get(0).getRawAnomalyScore(), EPSILON);
assertEquals(20.22688, bucketInfluencers.get(0).getAnomalyScore(), EPSILON);
assertEquals(0.01, bucketInfluencers.get(0).getProbability(), EPSILON);
assertEquals("bucket_time", bucketInfluencers.get(0).getInfluencerFieldName());
assertEquals(0.005, bucketInfluencers.get(1).getRawAnomalyScore(), EPSILON);
assertEquals(10.5, bucketInfluencers.get(1).getAnomalyScore(), EPSILON);
assertEquals(0.03, bucketInfluencers.get(1).getProbability(), EPSILON);
assertEquals("foo", bucketInfluencers.get(1).getInfluencerFieldName());
assertEquals(buckets.get(1).getEventCount(), 820);
bucketInfluencers = buckets.get(1).getBucketInfluencers();
assertEquals(2, bucketInfluencers.size());
assertEquals(0.0140005, bucketInfluencers.get(0).getRawAnomalyScore(), EPSILON);
assertEquals(20.22688, bucketInfluencers.get(0).getAnomalyScore(), EPSILON);
assertEquals(0.01, bucketInfluencers.get(0).getProbability(), EPSILON);
assertEquals("bucket_time", bucketInfluencers.get(0).getInfluencerFieldName());
assertEquals(0.005, bucketInfluencers.get(1).getRawAnomalyScore(), EPSILON);
assertEquals(10.5, bucketInfluencers.get(1).getAnomalyScore(), EPSILON);
assertEquals(0.03, bucketInfluencers.get(1).getProbability(), EPSILON);
assertEquals("foo", bucketInfluencers.get(1).getInfluencerFieldName());
Bucket secondBucket = buckets.get(1);
Bucket secondBucket = buckets.get(1);
assertEquals(0.0637541, secondBucket.getRecords().get(0).getProbability(), EPSILON);
assertEquals("airline", secondBucket.getRecords().get(0).getByFieldName());
assertEquals("JZA", secondBucket.getRecords().get(0).getByFieldValue());
assertEquals(1020.08, secondBucket.getRecords().get(0).getTypical().get(0), EPSILON);
assertEquals(1042.14, secondBucket.getRecords().get(0).getActual().get(0), EPSILON);
assertEquals("responsetime", secondBucket.getRecords().get(0).getFieldName());
assertEquals("max", secondBucket.getRecords().get(0).getFunction());
assertEquals("", secondBucket.getRecords().get(0).getPartitionFieldName());
assertEquals("", secondBucket.getRecords().get(0).getPartitionFieldValue());
assertEquals(0.0637541, secondBucket.getRecords().get(0).getProbability(), EPSILON);
assertEquals("airline", secondBucket.getRecords().get(0).getByFieldName());
assertEquals("JZA", secondBucket.getRecords().get(0).getByFieldValue());
assertEquals(1020.08, secondBucket.getRecords().get(0).getTypical().get(0), EPSILON);
assertEquals(1042.14, secondBucket.getRecords().get(0).getActual().get(0), EPSILON);
assertEquals("responsetime", secondBucket.getRecords().get(0).getFieldName());
assertEquals("max", secondBucket.getRecords().get(0).getFunction());
assertEquals("", secondBucket.getRecords().get(0).getPartitionFieldName());
assertEquals("", secondBucket.getRecords().get(0).getPartitionFieldValue());
assertEquals(0.00748292, secondBucket.getRecords().get(1).getProbability(), EPSILON);
assertEquals("airline", secondBucket.getRecords().get(1).getByFieldName());
assertEquals("AMX", secondBucket.getRecords().get(1).getByFieldValue());
assertEquals(20.2137, secondBucket.getRecords().get(1).getTypical().get(0), EPSILON);
assertEquals(22.8855, secondBucket.getRecords().get(1).getActual().get(0), EPSILON);
assertEquals("responsetime", secondBucket.getRecords().get(1).getFieldName());
assertEquals("max", secondBucket.getRecords().get(1).getFunction());
assertEquals("", secondBucket.getRecords().get(1).getPartitionFieldName());
assertEquals("", secondBucket.getRecords().get(1).getPartitionFieldValue());
assertEquals(0.00748292, secondBucket.getRecords().get(1).getProbability(), EPSILON);
assertEquals("airline", secondBucket.getRecords().get(1).getByFieldName());
assertEquals("AMX", secondBucket.getRecords().get(1).getByFieldValue());
assertEquals(20.2137, secondBucket.getRecords().get(1).getTypical().get(0), EPSILON);
assertEquals(22.8855, secondBucket.getRecords().get(1).getActual().get(0), EPSILON);
assertEquals("responsetime", secondBucket.getRecords().get(1).getFieldName());
assertEquals("max", secondBucket.getRecords().get(1).getFunction());
assertEquals("", secondBucket.getRecords().get(1).getPartitionFieldName());
assertEquals("", secondBucket.getRecords().get(1).getPartitionFieldValue());
assertEquals(0.023494, secondBucket.getRecords().get(2).getProbability(), EPSILON);
assertEquals("airline", secondBucket.getRecords().get(2).getByFieldName());
assertEquals("DAL", secondBucket.getRecords().get(2).getByFieldValue());
assertEquals(382.177, secondBucket.getRecords().get(2).getTypical().get(0), EPSILON);
assertEquals(358.934, secondBucket.getRecords().get(2).getActual().get(0), EPSILON);
assertEquals("responsetime", secondBucket.getRecords().get(2).getFieldName());
assertEquals("min", secondBucket.getRecords().get(2).getFunction());
assertEquals("", secondBucket.getRecords().get(2).getPartitionFieldName());
assertEquals("", secondBucket.getRecords().get(2).getPartitionFieldValue());
assertEquals(0.023494, secondBucket.getRecords().get(2).getProbability(), EPSILON);
assertEquals("airline", secondBucket.getRecords().get(2).getByFieldName());
assertEquals("DAL", secondBucket.getRecords().get(2).getByFieldValue());
assertEquals(382.177, secondBucket.getRecords().get(2).getTypical().get(0), EPSILON);
assertEquals(358.934, secondBucket.getRecords().get(2).getActual().get(0), EPSILON);
assertEquals("responsetime", secondBucket.getRecords().get(2).getFieldName());
assertEquals("min", secondBucket.getRecords().get(2).getFunction());
assertEquals("", secondBucket.getRecords().get(2).getPartitionFieldName());
assertEquals("", secondBucket.getRecords().get(2).getPartitionFieldValue());
assertEquals(0.0473552, secondBucket.getRecords().get(3).getProbability(), EPSILON);
assertEquals("airline", secondBucket.getRecords().get(3).getByFieldName());
assertEquals("SWA", secondBucket.getRecords().get(3).getByFieldValue());
assertEquals(152.148, secondBucket.getRecords().get(3).getTypical().get(0), EPSILON);
assertEquals(96.6425, secondBucket.getRecords().get(3).getActual().get(0), EPSILON);
assertEquals("responsetime", secondBucket.getRecords().get(3).getFieldName());
assertEquals("min", secondBucket.getRecords().get(3).getFunction());
assertEquals("", secondBucket.getRecords().get(3).getPartitionFieldName());
assertEquals("", secondBucket.getRecords().get(3).getPartitionFieldValue());
assertEquals(0.0473552, secondBucket.getRecords().get(3).getProbability(), EPSILON);
assertEquals("airline", secondBucket.getRecords().get(3).getByFieldName());
assertEquals("SWA", secondBucket.getRecords().get(3).getByFieldValue());
assertEquals(152.148, secondBucket.getRecords().get(3).getTypical().get(0), EPSILON);
assertEquals(96.6425, secondBucket.getRecords().get(3).getActual().get(0), EPSILON);
assertEquals("responsetime", secondBucket.getRecords().get(3).getFieldName());
assertEquals("min", secondBucket.getRecords().get(3).getFunction());
assertEquals("", secondBucket.getRecords().get(3).getPartitionFieldName());
assertEquals("", secondBucket.getRecords().get(3).getPartitionFieldValue());
List<Quantiles> quantiles = results.stream().map(AutodetectResult::getQuantiles)
List<Quantiles> quantiles = results.stream().map(AutodetectResult::getQuantiles)
.filter(q -> q != null)
.collect(Collectors.toList());
assertEquals(3, quantiles.size());
assertEquals("foo", quantiles.get(0).getJobId());
assertEquals(new Date(1359450000000L), quantiles.get(0).getTimestamp());
assertEquals("[normalizer 1.1, normalizer 2.1]", quantiles.get(0).getQuantileState());
assertEquals("foo", quantiles.get(1).getJobId());
assertEquals(new Date(1359453600000L), quantiles.get(1).getTimestamp());
assertEquals("[normalizer 1.2, normalizer 2.2]", quantiles.get(1).getQuantileState());
assertEquals("foo", quantiles.get(2).getJobId());
assertEquals(new Date(1359453600000L), quantiles.get(2).getTimestamp());
assertEquals("[normalizer 1.3, normalizer 2.3]", quantiles.get(2).getQuantileState());
assertEquals(3, quantiles.size());
assertEquals("foo", quantiles.get(0).getJobId());
assertEquals(new Date(1359450000000L), quantiles.get(0).getTimestamp());
assertEquals("[normalizer 1.1, normalizer 2.1]", quantiles.get(0).getQuantileState());
assertEquals("foo", quantiles.get(1).getJobId());
assertEquals(new Date(1359453600000L), quantiles.get(1).getTimestamp());
assertEquals("[normalizer 1.2, normalizer 2.2]", quantiles.get(1).getQuantileState());
assertEquals("foo", quantiles.get(2).getJobId());
assertEquals(new Date(1359453600000L), quantiles.get(2).getTimestamp());
assertEquals("[normalizer 1.3, normalizer 2.3]", quantiles.get(2).getQuantileState());
}
}
@AwaitsFix(bugUrl = "rewrite this test so it doesn't use ~200 lines of json")
public void testPopulationParser() throws IOException {
InputStream inputStream = new ByteArrayInputStream(POPULATION_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser();
List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).forEachRemaining(results::add);
List<Bucket> buckets = results.stream().map(AutodetectResult::getBucket)
try (InputStream inputStream = new ByteArrayInputStream(POPULATION_OUTPUT_SAMPLE.getBytes(StandardCharsets.UTF_8))) {
AutodetectResultsParser parser = new AutodetectResultsParser();
List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).forEachRemaining(results::add);
List<Bucket> buckets = results.stream().map(AutodetectResult::getBucket)
.filter(b -> b != null)
.collect(Collectors.toList());
assertEquals(2, buckets.size());
assertEquals(new Date(1379590200000L), buckets.get(0).getTimestamp());
assertEquals(buckets.get(0).getEventCount(), 1235);
assertEquals(2, buckets.size());
assertEquals(new Date(1379590200000L), buckets.get(0).getTimestamp());
assertEquals(buckets.get(0).getEventCount(), 1235);
Bucket firstBucket = buckets.get(0);
assertEquals(1.38951e-08, firstBucket.getRecords().get(0).getProbability(), EPSILON);
assertEquals("sum_cs_bytes_", firstBucket.getRecords().get(0).getFieldName());
assertEquals("max", firstBucket.getRecords().get(0).getFunction());
assertEquals("cs_host", firstBucket.getRecords().get(0).getOverFieldName());
assertEquals("mail.google.com", firstBucket.getRecords().get(0).getOverFieldValue());
assertNotNull(firstBucket.getRecords().get(0).getCauses());
Bucket firstBucket = buckets.get(0);
assertEquals(1.38951e-08, firstBucket.getRecords().get(0).getProbability(), EPSILON);
assertEquals("sum_cs_bytes_", firstBucket.getRecords().get(0).getFieldName());
assertEquals("max", firstBucket.getRecords().get(0).getFunction());
assertEquals("cs_host", firstBucket.getRecords().get(0).getOverFieldName());
assertEquals("mail.google.com", firstBucket.getRecords().get(0).getOverFieldValue());
assertNotNull(firstBucket.getRecords().get(0).getCauses());
assertEquals(new Date(1379590800000L), buckets.get(1).getTimestamp());
assertEquals(buckets.get(1).getEventCount(), 1159);
assertEquals(new Date(1379590800000L), buckets.get(1).getTimestamp());
assertEquals(buckets.get(1).getEventCount(), 1159);
}
}
public void testParse_GivenEmptyArray() throws ElasticsearchParseException, IOException {
String json = "[]";
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser();
assertFalse(parser.parseResults(inputStream).hasNext());
try (InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) {
AutodetectResultsParser parser = new AutodetectResultsParser();
assertFalse(parser.parseResults(inputStream).hasNext());
}
}
public void testParse_GivenModelSizeStats() throws ElasticsearchParseException, IOException {
String json = "[{\"model_size_stats\": {\"job_id\": \"foo\", \"model_bytes\":300}}]";
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
try (InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) {
AutodetectResultsParser parser = new AutodetectResultsParser();
List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).forEachRemaining(results::add);
AutodetectResultsParser parser = new AutodetectResultsParser();
List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).forEachRemaining(results::add);
assertEquals(1, results.size());
assertEquals(300, results.get(0).getModelSizeStats().getModelBytes());
assertEquals(1, results.size());
assertEquals(300, results.get(0).getModelSizeStats().getModelBytes());
}
}
public void testParse_GivenCategoryDefinition() throws IOException {
String json = "[{\"category_definition\": {\"job_id\":\"foo\", \"category_id\":18}}]";
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser();
List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).forEachRemaining(results::add);
try (InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) {
AutodetectResultsParser parser = new AutodetectResultsParser();
List<AutodetectResult> results = new ArrayList<>();
parser.parseResults(inputStream).forEachRemaining(results::add);
assertEquals(1, results.size());
assertEquals(18, results.get(0).getCategoryDefinition().getCategoryId());
assertEquals(1, results.size());
assertEquals(18, results.get(0).getCategoryDefinition().getCategoryId());
}
}
public void testParse_GivenUnknownObject() throws ElasticsearchParseException, IOException {
String json = "[{\"unknown\":{\"id\": 18}}]";
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser();
XContentParseException e = expectThrows(XContentParseException.class,
() -> parser.parseResults(inputStream).forEachRemaining(a -> {}));
assertEquals("[1:3] [autodetect_result] unknown field [unknown], parser not found", e.getMessage());
try (InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) {
AutodetectResultsParser parser = new AutodetectResultsParser();
XContentParseException e = expectThrows(XContentParseException.class,
() -> parser.parseResults(inputStream).forEachRemaining(a -> {
}));
assertEquals("[1:3] [autodetect_result] unknown field [unknown], parser not found", e.getMessage());
}
}
public void testParse_GivenArrayContainsAnotherArray() throws ElasticsearchParseException, IOException {
String json = "[[]]";
InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser parser = new AutodetectResultsParser();
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class,
() -> parser.parseResults(inputStream).forEachRemaining(a -> {}));
assertEquals("unexpected token [START_ARRAY]", e.getMessage());
try (InputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) {
AutodetectResultsParser parser = new AutodetectResultsParser();
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class,
() -> parser.parseResults(inputStream).forEachRemaining(a -> {
}));
assertEquals("unexpected token [START_ARRAY]", e.getMessage());
}
}
/**
@ -415,17 +419,4 @@ public class AutodetectResultsParserTests extends ESTestCase {
expectThrows(XContentParseException.class,
() -> parser.parseResults(inputStream).forEachRemaining(a -> {}));
}
public void testConsumeAndCloseStream() throws IOException {
String json = "some string of data";
ByteArrayInputStream inputStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
AutodetectResultsParser.consumeAndCloseStream(inputStream);
assertEquals(0, inputStream.available());
InputStream mockStream = mock(InputStream.class);
when(mockStream.read(any())).thenReturn(-1);
AutodetectResultsParser.consumeAndCloseStream(mockStream);
verify(mockStream, times(1)).close();
}
}