diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java index 40c80d109d4..124a6f2feb4 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/manager/AutodetectProcessManager.java @@ -145,7 +145,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP // TODO check for errors from autodetect } catch (IOException ioe) { String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId); - logger.warn(msg); + logger.error(msg); throw ExceptionsHelper.serverError(msg, ioe); } } @@ -164,6 +164,13 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP public void openJob(String jobId, boolean ignoreDowntime) { autoDetectCommunicatorByJob.computeIfAbsent(jobId, id -> { AutodetectCommunicator communicator = create(id, ignoreDowntime); + try { + communicator.writeJobInputHeader(); + } catch (IOException ioe) { + String msg = String.format(Locale.ROOT, "[%s] exception while opening job", jobId); + logger.error(msg); + throw ExceptionsHelper.serverError(msg, ioe); + } setJobStatus(jobId, JobStatus.OPENED); return communicator; }); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicator.java index f14654ec609..be3d330d5e9 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/AutodetectCommunicator.java @@ -74,6 +74,10 @@ public class AutodetectCommunicator implements Closeable { new TransformConfigs(job.getTransforms()) , statusReporter, LOGGER); } + public void writeJobInputHeader() throws IOException { + autoDetectWriter.writeHeader(); + } + public DataCounts writeToJob(InputStream inputStream, DataLoadParams params, Supplier cancelled) throws IOException { return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, jobId), () -> { if (params.isResettingBuckets()) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriter.java index d0eae921d4e..f1c76e9453c 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriter.java @@ -107,10 +107,8 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter *

* Transforms can be chained so some write their outputs to * a scratch area which is input to another transform - *

- * Writes the header. */ - public void buildTransformsAndWriteHeader(String[] header) throws IOException { + public void buildTransforms(String[] header) throws IOException { Collection inputFields = inputFields(); inFieldIndexes = inputFieldIndexes(header, inputFields); checkForMissingFields(inputFields, inFieldIndexes, header); @@ -124,13 +122,11 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter scratchArea = new String[scratchAreaIndexes.size()]; readWriteArea[TransformFactory.SCRATCH_ARRAY_INDEX] = scratchArea; - buildDateTransform(scratchAreaIndexes, outFieldIndexes); List dateInputTransforms = DependencySorter.findDependencies( dataDescription.getTimeField(), transformConfigs.getTransforms()); - TransformFactory transformFactory = new TransformFactory(); for (TransformConfig config : dateInputTransforms) { Transform tr = transformFactory.create(config, inFieldIndexes, scratchAreaIndexes, @@ -152,8 +148,29 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter outFieldIndexes, logger); this.postDateTransforms.add(tr); } + } - writeHeader(outFieldIndexes); + /** + * Write the header. + * The header is created from the list of analysis input fields, + * the time field and the control field + */ + @Override + public void writeHeader() throws IOException { + Map outFieldIndexes = outputFieldIndexes(); + + // header is all the analysis input fields + the time field + control field + int numFields = outFieldIndexes.size(); + String[] record = new String[numFields]; + + Iterator> itr = outFieldIndexes.entrySet().iterator(); + while (itr.hasNext()) { + Map.Entry entry = itr.next(); + record[entry.getValue()] = entry.getKey(); + } + + // Write the header + autodetectProcess.writeRecord(record); } protected void buildDateTransform(Map scratchAreaIndexes, Map outFieldIndexes) { @@ -180,7 +197,6 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter } } - List writeIndexes = new ArrayList<>(); writeIndexes.add(new TransformIndex(TransformFactory.OUTPUT_ARRAY_INDEX, outFieldIndexes.get(dataDescription.getTimeField()))); @@ -195,7 +211,6 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter dateTransform = new DoubleDateTransform(dataDescription.isEpochMs(), readIndexes, writeIndexes, logger); } - } /** @@ -209,7 +224,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter * * @param cancelled Determines whether the process writting has been cancelled * @param input The record the transforms should read their input from. The contents should - * align with the header parameter passed to {@linkplain #buildTransformsAndWriteHeader(String[])} + * align with the header parameter passed to {@linkplain #buildTransforms(String[])} * @param output The record that will be written to the length encoded writer. * This should be the same size as the number of output (analysis fields) i.e. * the size of the map returned by {@linkplain #outputFieldIndexes()} @@ -284,27 +299,6 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter return true; } - - /** - * Write the header. - * The header is created from the list of analysis input fields, - * the time field and the control field - */ - protected void writeHeader(Map outFieldIndexes) throws IOException { - // header is all the analysis input fields + the time field + control field - int numFields = outFieldIndexes.size(); - String[] record = new String[numFields]; - - Iterator> itr = outFieldIndexes.entrySet().iterator(); - while (itr.hasNext()) { - Map.Entry entry = itr.next(); - record[entry.getValue()] = entry.getKey(); - } - - // Write the header - autodetectProcess.writeRecord(record); - } - @Override public void flush() throws IOException { autodetectProcess.flushStream(); @@ -471,7 +465,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter * Either return true or throw a MissingFieldException *

* Every input field should have an entry in inputFieldIndexes - * otherwise the field cannnot be found. + * otherwise the field cannot be found. */ protected abstract boolean checkForMissingFields(Collection inputFields, Map inputFieldIndexes, String[] header); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriter.java index 167dfd20284..1ff39ff5061 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriter.java @@ -81,7 +81,7 @@ class CsvDataToProcessWriter extends AbstractDataToProcessWriter { long inputFieldCount = Math.max(header.length - 1, 0); // time field doesn't count - buildTransformsAndWriteHeader(header); + buildTransforms(header); //backing array for the inputIndex String[] inputRecord = new String[header.length]; @@ -146,8 +146,7 @@ class CsvDataToProcessWriter extends AbstractDataToProcessWriter { } Integer index = inputFieldIndexes.get(field); if (index == null) { - String msg = String.format(Locale.ROOT, "Field configured for analysis " - + "'%s' is not in the CSV header '%s'", + String msg = String.format(Locale.ROOT, "Field configured for analysis '%s' is not in the CSV header '%s'", field, Arrays.toString(header)); logger.error(msg); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataToProcessWriter.java index eade5b79b62..05e9843add1 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataToProcessWriter.java @@ -16,6 +16,14 @@ import org.elasticsearch.xpack.prelert.job.DataCounts; * inputstream to outputstream as the process expects. */ public interface DataToProcessWriter { + + /** + * Write the header. + * The header is created from the list of analysis input fields, + * the time field and the control field. + */ + void writeHeader() throws IOException; + /** * Reads the inputIndex, transform to length encoded values and pipe * to the OutputStream. diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriter.java index be2937a0ba3..186f3dc1d28 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriter.java @@ -64,7 +64,7 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter { private void writeJson(JsonParser parser, Supplier cancelled) throws IOException { Collection analysisFields = inputFields(); - buildTransformsAndWriteHeader(analysisFields.toArray(new String[0])); + buildTransforms(analysisFields.toArray(new String[0])); int numFields = outputFieldCount(); String[] input = new String[numFields]; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriter.java index 780fa5762b9..9da93252622 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriter.java @@ -49,7 +49,7 @@ public class SingleLineDataToProcessWriter extends AbstractDataToProcessWriter { try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { String[] header = {RAW}; - buildTransformsAndWriteHeader(header); + buildTransforms(header); int numFields = outputFieldCount(); String[] record = new String[numFields]; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java index 8f7ec9544a8..3098d46288e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java @@ -79,6 +79,8 @@ public class AbstractDataToProcessWriterTests extends ESTestCase { AbstractDataToProcessWriter writer = new CsvDataToProcessWriter(true, autodetectProcess, dd.build(), ac, transforms, statusReporter, jobLogger); + writer.writeHeader(); + Set inputFields = new HashSet<>(writer.inputFields()); assertEquals(4, inputFields.size()); assertTrue(inputFields.contains("time_field")); @@ -87,7 +89,7 @@ public class AbstractDataToProcessWriterTests extends ESTestCase { assertTrue(inputFields.contains("metric")); String[] header = { "time_field", "metric", "host", "value" }; - writer.buildTransformsAndWriteHeader(header); + writer.buildTransforms(header); List trs = writer.postDateTransforms; assertEquals(1, trs.size()); Transform tr = trs.get(0); @@ -136,6 +138,8 @@ public class AbstractDataToProcessWriterTests extends ESTestCase { AbstractDataToProcessWriter writer = new CsvDataToProcessWriter(true, autodetectProcess, dd.build(), ac, transforms, statusReporter, jobLogger); + writer.writeHeader(); + Set inputFields = new HashSet<>(writer.inputFields()); assertEquals(3, inputFields.size()); @@ -144,7 +148,7 @@ public class AbstractDataToProcessWriterTests extends ESTestCase { assertTrue(inputFields.contains("domain")); String[] header = { "time_field", "domain", "value" }; - writer.buildTransformsAndWriteHeader(header); + writer.buildTransforms(header); List trs = writer.postDateTransforms; assertEquals(1, trs.size()); @@ -205,6 +209,8 @@ public class AbstractDataToProcessWriterTests extends ESTestCase { AbstractDataToProcessWriter writer = new CsvDataToProcessWriter(true, autodetectProcess, dd.build(), ac, transforms, statusReporter, jobLogger); + writer.writeHeader(); + Set inputFields = new HashSet<>(writer.inputFields()); assertEquals(3, inputFields.size()); @@ -213,7 +219,7 @@ public class AbstractDataToProcessWriterTests extends ESTestCase { assertTrue(inputFields.contains("domain")); String[] header = { "time_field", "domain", "value" }; - writer.buildTransformsAndWriteHeader(header); + writer.buildTransforms(header); List trs = writer.postDateTransforms; assertEquals(1, trs.size()); @@ -275,6 +281,8 @@ public class AbstractDataToProcessWriterTests extends ESTestCase { AbstractDataToProcessWriter writer = new CsvDataToProcessWriter(true, autodetectProcess, dd.build(), ac, transforms, statusReporter, jobLogger); + writer.writeHeader(); + Set inputFields = new HashSet<>(writer.inputFields()); assertEquals(4, inputFields.size()); @@ -285,7 +293,7 @@ public class AbstractDataToProcessWriterTests extends ESTestCase { String[] header = { "date", "time", "domain", "value" }; - writer.buildTransformsAndWriteHeader(header); + writer.buildTransforms(header); List trs = writer.dateInputTransforms; assertEquals(1, trs.size()); assertTrue(trs.get(0) instanceof Concat); @@ -325,9 +333,11 @@ public class AbstractDataToProcessWriterTests extends ESTestCase { AbstractDataToProcessWriter writer = new CsvDataToProcessWriter(true, autodetectProcess, dd.build(), ac, transforms, statusReporter, jobLogger); + writer.writeHeader(); + String[] header = { "datetime", "metric", "value" }; - writer.buildTransformsAndWriteHeader(header); + writer.buildTransforms(header); // metricA is excluded String[] input = { "1", "metricA", "0" }; @@ -376,9 +386,11 @@ public class AbstractDataToProcessWriterTests extends ESTestCase { AbstractDataToProcessWriter writer = new CsvDataToProcessWriter(true, autodetectProcess, dd.build(), ac, transforms, statusReporter, jobLogger); + writer.writeHeader(); + String[] header = { "date-somethingelse", "time", "type", "value" }; - writer.buildTransformsAndWriteHeader(header); + writer.buildTransforms(header); // the date input transforms should be in this order List trs = writer.dateInputTransforms; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriterTests.java index c3120fcb79f..97dee32c655 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/CsvDataToProcessWriterTests.java @@ -88,6 +88,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { public void testWrite_cancel() throws Exception { InputStream inputStream = endLessStream("time,metric,value\n", "1,,foo\n"); CsvDataToProcessWriter writer = createWriter(); + writer.writeHeader(); AtomicBoolean cancel = new AtomicBoolean(false); AtomicReference exception = new AtomicReference<>(); @@ -119,7 +120,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { input.append("2,bar,2.0\n"); InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -151,7 +152,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { input.append("2,,\n"); InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -174,7 +175,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { input.append("2,bar,2.0\n"); InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -199,7 +200,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { when(statusReporter.getLatestRecordTime()).thenReturn(new Date(5000L)); CsvDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -230,7 +231,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { input.append("\0"); InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -264,7 +265,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { input.append("\0"); InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -286,7 +287,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { verify(statusReporter).finishReporting(); } - public void testWrite_EmpytInput() throws IOException { + public void testWrite_EmptyInput() throws IOException { AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "value").build())); builder.setLatency(0L); analysisConfig = builder.build(); @@ -295,6 +296,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(""); CsvDataToProcessWriter writer = createWriter(); + writer.writeHeader(); DataCounts counts = writer.write(inputStream, () -> false); assertEquals(0L, counts.getInputBytes()); @@ -316,6 +318,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { dataDescription.setTimeFormat("yyyy-MM-ddHH:mm:ssX"); CsvDataToProcessWriter writer = createWriter(); + writer.writeHeader(); StringBuilder input = new StringBuilder(); input.append("date,time-of-day,metric,value\n"); @@ -361,7 +364,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { input.append("2,www,bar.com,2.0\n"); InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -387,6 +390,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase { input.append("2,bar\",2.0\n"); InputStream inputStream = createInputStream(input.toString()); CsvDataToProcessWriter writer = createWriter(); + writer.writeHeader(); SuperCsvException e = ESTestCase.expectThrows(SuperCsvException.class, () -> writer.write(inputStream, () -> false)); // Expected line numbers are 2 and 10001, but SuperCSV may print the diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataWithTransformsToProcessWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataWithTransformsToProcessWriterTests.java index 728690afdf6..dff546d9dbc 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataWithTransformsToProcessWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/DataWithTransformsToProcessWriterTests.java @@ -67,7 +67,7 @@ public class DataWithTransformsToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); AbstractDataToProcessWriter writer = createWriter(true); - + writer.writeHeader(); writer.write(inputStream, () -> false); List expectedRecords = new ArrayList<>(); @@ -90,7 +90,7 @@ public class DataWithTransformsToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); AbstractDataToProcessWriter writer = createWriter(false); - + writer.writeHeader(); writer.write(inputStream, () -> false); List expectedRecords = new ArrayList<>(); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriterTests.java index 08a63e6a356..60cee76887b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/JsonDataToProcessWriterTests.java @@ -83,6 +83,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { public void testWrite_cancel() throws Exception { InputStream inputStream = endLessStream("", "{\"time\":1}"); JsonDataToProcessWriter writer = createWriter(); + writer.writeHeader(); AtomicBoolean cancel = new AtomicBoolean(false); AtomicReference exception = new AtomicReference<>(); @@ -112,7 +113,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { input.append("{\"time\":\"2\", \"metric\":\"bar\", \"value\":\"2.0\"}"); InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -134,7 +135,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { input.append("{\"time\":\"2\", \"metric\":\"bar\", \"value\":\"2.0\"}"); InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -163,7 +164,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { input.append("{\"time\":\"2\", \"metric\":\"bar\", \"value\":\"2.0\"}"); InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); List expectedRecords = new ArrayList<>(); @@ -192,7 +193,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { input.append("{\"time\":\"3\", \"value\":\"3.0\"}"); InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -221,7 +222,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { input.append("{\"time\":\"3\", \"nested\":{\"value\":\"3.0\"}}"); InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -247,6 +248,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { input.append("{\"time"); InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); + writer.writeHeader(); ESTestCase.expectThrows(ElasticsearchParseException.class, () -> writer.write(inputStream, () -> false)); } @@ -262,7 +264,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { input.append("{\"time\":\"2\", \"array\":[], \"value\":\"2.0\"}"); InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -291,7 +293,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -327,6 +329,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { dataDescription.setTimeFormat("yyyy-MM-ddHH:mm:ssX"); JsonDataToProcessWriter writer = createWriter(); + writer.writeHeader(); StringBuilder input = new StringBuilder(); input.append("{\"date\":\"1970-01-01\", \"time-of-day\":\"00:00:01Z\", \"value\":\"5.0\"}"); @@ -369,7 +372,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase { input.append("{\"time\":\"2\", \"dns1\":\"www\", \"dns2\":\"bar.com\", \"value\":\"2.0\"}"); InputStream inputStream = createInputStream(input.toString()); JsonDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriterTests.java index ea14e6494e3..98aafae2fdd 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/writer/SingleLineDataToProcessWriterTests.java @@ -86,6 +86,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase { InputStream inputStream = endLessStream("", "2015-04-29 10:00:00Z this is a message\n"); SingleLineDataToProcessWriter writer = createWriter(); + writer.writeHeader(); AtomicBoolean cancel = new AtomicBoolean(false); AtomicReference exception = new AtomicReference<>(); @@ -122,7 +123,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase { input.append("2015-04-29 12:00:00Z This is message 3\r\n"); InputStream inputStream = createInputStream(input.toString()); SingleLineDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).getLatestRecordTime(); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -159,7 +160,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase { input.append("2015-04-29 12:00:00Z This is message 3\n"); InputStream inputStream = createInputStream(input.toString()); SingleLineDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).getLatestRecordTime(); verify(statusReporter, times(1)).startNewIncrementalCount(); @@ -185,7 +186,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase { input.append("2015-04-29 10:00:00Z This is message 1\n"); InputStream inputStream = createInputStream(input.toString()); SingleLineDataToProcessWriter writer = createWriter(); - + writer.writeHeader(); writer.write(inputStream, () -> false); verify(statusReporter, times(1)).startNewIncrementalCount(); verify(statusReporter, times(1)).setAnalysedFieldsPerRecord(1); diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/post_data.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/post_data.yaml index 465c8f3ab05..0b306fbfc59 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/post_data.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/post_data.yaml @@ -70,6 +70,23 @@ setup: - match: { _source.earliest_record_timestamp: 1403481600000} - match: { _source.latest_record_timestamp: 1403481700000} +--- +"Test flush and close job WITHOUT sending any data": + - do: + xpack.prelert.flush_job: + job_id: farequote + - match: { acknowledged: true } + + - do: + xpack.prelert.close_job: + job_id: farequote + - match: { acknowledged: true } + + - do: + xpack.prelert.get_jobs_stats: + job_id: farequote + - match: { jobs.0.status: "CLOSED" } + --- "Test POST data with invalid parameters": - do: