Send header to C++ process on job open (elastic/elasticsearch#603)
This avoids the problem of the C++ process getting confused by control messages that are sent before any data messages have been sent Fixes elastic/elasticsearch#593 Original commit: elastic/x-pack-elasticsearch@9aed3f0b82
This commit is contained in:
parent
a47c5332d5
commit
92def19a73
|
@ -145,7 +145,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
|
|||
// TODO check for errors from autodetect
|
||||
} catch (IOException ioe) {
|
||||
String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId);
|
||||
logger.warn(msg);
|
||||
logger.error(msg);
|
||||
throw ExceptionsHelper.serverError(msg, ioe);
|
||||
}
|
||||
}
|
||||
|
@ -164,6 +164,13 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
|
|||
public void openJob(String jobId, boolean ignoreDowntime) {
|
||||
autoDetectCommunicatorByJob.computeIfAbsent(jobId, id -> {
|
||||
AutodetectCommunicator communicator = create(id, ignoreDowntime);
|
||||
try {
|
||||
communicator.writeJobInputHeader();
|
||||
} catch (IOException ioe) {
|
||||
String msg = String.format(Locale.ROOT, "[%s] exception while opening job", jobId);
|
||||
logger.error(msg);
|
||||
throw ExceptionsHelper.serverError(msg, ioe);
|
||||
}
|
||||
setJobStatus(jobId, JobStatus.OPENED);
|
||||
return communicator;
|
||||
});
|
||||
|
|
|
@ -74,6 +74,10 @@ public class AutodetectCommunicator implements Closeable {
|
|||
new TransformConfigs(job.getTransforms()) , statusReporter, LOGGER);
|
||||
}
|
||||
|
||||
public void writeJobInputHeader() throws IOException {
|
||||
autoDetectWriter.writeHeader();
|
||||
}
|
||||
|
||||
public DataCounts writeToJob(InputStream inputStream, DataLoadParams params, Supplier<Boolean> cancelled) throws IOException {
|
||||
return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, jobId), () -> {
|
||||
if (params.isResettingBuckets()) {
|
||||
|
|
|
@ -107,10 +107,8 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
|
|||
* <p>
|
||||
* Transforms can be chained so some write their outputs to
|
||||
* a scratch area which is input to another transform
|
||||
* <p>
|
||||
* Writes the header.
|
||||
*/
|
||||
public void buildTransformsAndWriteHeader(String[] header) throws IOException {
|
||||
public void buildTransforms(String[] header) throws IOException {
|
||||
Collection<String> inputFields = inputFields();
|
||||
inFieldIndexes = inputFieldIndexes(header, inputFields);
|
||||
checkForMissingFields(inputFields, inFieldIndexes, header);
|
||||
|
@ -124,13 +122,11 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
|
|||
scratchArea = new String[scratchAreaIndexes.size()];
|
||||
readWriteArea[TransformFactory.SCRATCH_ARRAY_INDEX] = scratchArea;
|
||||
|
||||
|
||||
buildDateTransform(scratchAreaIndexes, outFieldIndexes);
|
||||
|
||||
List<TransformConfig> dateInputTransforms = DependencySorter.findDependencies(
|
||||
dataDescription.getTimeField(), transformConfigs.getTransforms());
|
||||
|
||||
|
||||
TransformFactory transformFactory = new TransformFactory();
|
||||
for (TransformConfig config : dateInputTransforms) {
|
||||
Transform tr = transformFactory.create(config, inFieldIndexes, scratchAreaIndexes,
|
||||
|
@ -152,8 +148,29 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
|
|||
outFieldIndexes, logger);
|
||||
this.postDateTransforms.add(tr);
|
||||
}
|
||||
}
|
||||
|
||||
writeHeader(outFieldIndexes);
|
||||
/**
|
||||
* Write the header.
|
||||
* The header is created from the list of analysis input fields,
|
||||
* the time field and the control field
|
||||
*/
|
||||
@Override
|
||||
public void writeHeader() throws IOException {
|
||||
Map<String, Integer> outFieldIndexes = outputFieldIndexes();
|
||||
|
||||
// header is all the analysis input fields + the time field + control field
|
||||
int numFields = outFieldIndexes.size();
|
||||
String[] record = new String[numFields];
|
||||
|
||||
Iterator<Map.Entry<String, Integer>> itr = outFieldIndexes.entrySet().iterator();
|
||||
while (itr.hasNext()) {
|
||||
Map.Entry<String, Integer> entry = itr.next();
|
||||
record[entry.getValue()] = entry.getKey();
|
||||
}
|
||||
|
||||
// Write the header
|
||||
autodetectProcess.writeRecord(record);
|
||||
}
|
||||
|
||||
protected void buildDateTransform(Map<String, Integer> scratchAreaIndexes, Map<String, Integer> outFieldIndexes) {
|
||||
|
@ -180,7 +197,6 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
List<TransformIndex> writeIndexes = new ArrayList<>();
|
||||
writeIndexes.add(new TransformIndex(TransformFactory.OUTPUT_ARRAY_INDEX,
|
||||
outFieldIndexes.get(dataDescription.getTimeField())));
|
||||
|
@ -195,7 +211,6 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
|
|||
dateTransform = new DoubleDateTransform(dataDescription.isEpochMs(),
|
||||
readIndexes, writeIndexes, logger);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -209,7 +224,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
|
|||
*
|
||||
* @param cancelled Determines whether the process writting has been cancelled
|
||||
* @param input The record the transforms should read their input from. The contents should
|
||||
* align with the header parameter passed to {@linkplain #buildTransformsAndWriteHeader(String[])}
|
||||
* align with the header parameter passed to {@linkplain #buildTransforms(String[])}
|
||||
* @param output The record that will be written to the length encoded writer.
|
||||
* This should be the same size as the number of output (analysis fields) i.e.
|
||||
* the size of the map returned by {@linkplain #outputFieldIndexes()}
|
||||
|
@ -284,27 +299,6 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
|
|||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Write the header.
|
||||
* The header is created from the list of analysis input fields,
|
||||
* the time field and the control field
|
||||
*/
|
||||
protected void writeHeader(Map<String, Integer> outFieldIndexes) throws IOException {
|
||||
// header is all the analysis input fields + the time field + control field
|
||||
int numFields = outFieldIndexes.size();
|
||||
String[] record = new String[numFields];
|
||||
|
||||
Iterator<Map.Entry<String, Integer>> itr = outFieldIndexes.entrySet().iterator();
|
||||
while (itr.hasNext()) {
|
||||
Map.Entry<String, Integer> entry = itr.next();
|
||||
record[entry.getValue()] = entry.getKey();
|
||||
}
|
||||
|
||||
// Write the header
|
||||
autodetectProcess.writeRecord(record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
autodetectProcess.flushStream();
|
||||
|
@ -471,7 +465,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
|
|||
* Either return true or throw a MissingFieldException
|
||||
* <p>
|
||||
* Every input field should have an entry in <code>inputFieldIndexes</code>
|
||||
* otherwise the field cannnot be found.
|
||||
* otherwise the field cannot be found.
|
||||
*/
|
||||
protected abstract boolean checkForMissingFields(Collection<String> inputFields, Map<String, Integer> inputFieldIndexes,
|
||||
String[] header);
|
||||
|
|
|
@ -81,7 +81,7 @@ class CsvDataToProcessWriter extends AbstractDataToProcessWriter {
|
|||
|
||||
long inputFieldCount = Math.max(header.length - 1, 0); // time field doesn't count
|
||||
|
||||
buildTransformsAndWriteHeader(header);
|
||||
buildTransforms(header);
|
||||
|
||||
//backing array for the inputIndex
|
||||
String[] inputRecord = new String[header.length];
|
||||
|
@ -146,8 +146,7 @@ class CsvDataToProcessWriter extends AbstractDataToProcessWriter {
|
|||
}
|
||||
Integer index = inputFieldIndexes.get(field);
|
||||
if (index == null) {
|
||||
String msg = String.format(Locale.ROOT, "Field configured for analysis "
|
||||
+ "'%s' is not in the CSV header '%s'",
|
||||
String msg = String.format(Locale.ROOT, "Field configured for analysis '%s' is not in the CSV header '%s'",
|
||||
field, Arrays.toString(header));
|
||||
|
||||
logger.error(msg);
|
||||
|
|
|
@ -16,6 +16,14 @@ import org.elasticsearch.xpack.prelert.job.DataCounts;
|
|||
* inputstream to outputstream as the process expects.
|
||||
*/
|
||||
public interface DataToProcessWriter {
|
||||
|
||||
/**
|
||||
* Write the header.
|
||||
* The header is created from the list of analysis input fields,
|
||||
* the time field and the control field.
|
||||
*/
|
||||
void writeHeader() throws IOException;
|
||||
|
||||
/**
|
||||
* Reads the inputIndex, transform to length encoded values and pipe
|
||||
* to the OutputStream.
|
||||
|
|
|
@ -64,7 +64,7 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
|
|||
private void writeJson(JsonParser parser, Supplier<Boolean> cancelled) throws IOException {
|
||||
Collection<String> analysisFields = inputFields();
|
||||
|
||||
buildTransformsAndWriteHeader(analysisFields.toArray(new String[0]));
|
||||
buildTransforms(analysisFields.toArray(new String[0]));
|
||||
|
||||
int numFields = outputFieldCount();
|
||||
String[] input = new String[numFields];
|
||||
|
|
|
@ -49,7 +49,7 @@ public class SingleLineDataToProcessWriter extends AbstractDataToProcessWriter {
|
|||
|
||||
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
|
||||
String[] header = {RAW};
|
||||
buildTransformsAndWriteHeader(header);
|
||||
buildTransforms(header);
|
||||
|
||||
int numFields = outputFieldCount();
|
||||
String[] record = new String[numFields];
|
||||
|
|
|
@ -79,6 +79,8 @@ public class AbstractDataToProcessWriterTests extends ESTestCase {
|
|||
AbstractDataToProcessWriter writer =
|
||||
new CsvDataToProcessWriter(true, autodetectProcess, dd.build(), ac, transforms, statusReporter, jobLogger);
|
||||
|
||||
writer.writeHeader();
|
||||
|
||||
Set<String> inputFields = new HashSet<>(writer.inputFields());
|
||||
assertEquals(4, inputFields.size());
|
||||
assertTrue(inputFields.contains("time_field"));
|
||||
|
@ -87,7 +89,7 @@ public class AbstractDataToProcessWriterTests extends ESTestCase {
|
|||
assertTrue(inputFields.contains("metric"));
|
||||
|
||||
String[] header = { "time_field", "metric", "host", "value" };
|
||||
writer.buildTransformsAndWriteHeader(header);
|
||||
writer.buildTransforms(header);
|
||||
List<Transform> trs = writer.postDateTransforms;
|
||||
assertEquals(1, trs.size());
|
||||
Transform tr = trs.get(0);
|
||||
|
@ -136,6 +138,8 @@ public class AbstractDataToProcessWriterTests extends ESTestCase {
|
|||
AbstractDataToProcessWriter writer =
|
||||
new CsvDataToProcessWriter(true, autodetectProcess, dd.build(), ac, transforms, statusReporter, jobLogger);
|
||||
|
||||
writer.writeHeader();
|
||||
|
||||
Set<String> inputFields = new HashSet<>(writer.inputFields());
|
||||
|
||||
assertEquals(3, inputFields.size());
|
||||
|
@ -144,7 +148,7 @@ public class AbstractDataToProcessWriterTests extends ESTestCase {
|
|||
assertTrue(inputFields.contains("domain"));
|
||||
|
||||
String[] header = { "time_field", "domain", "value" };
|
||||
writer.buildTransformsAndWriteHeader(header);
|
||||
writer.buildTransforms(header);
|
||||
List<Transform> trs = writer.postDateTransforms;
|
||||
assertEquals(1, trs.size());
|
||||
|
||||
|
@ -205,6 +209,8 @@ public class AbstractDataToProcessWriterTests extends ESTestCase {
|
|||
AbstractDataToProcessWriter writer =
|
||||
new CsvDataToProcessWriter(true, autodetectProcess, dd.build(), ac, transforms, statusReporter, jobLogger);
|
||||
|
||||
writer.writeHeader();
|
||||
|
||||
Set<String> inputFields = new HashSet<>(writer.inputFields());
|
||||
|
||||
assertEquals(3, inputFields.size());
|
||||
|
@ -213,7 +219,7 @@ public class AbstractDataToProcessWriterTests extends ESTestCase {
|
|||
assertTrue(inputFields.contains("domain"));
|
||||
|
||||
String[] header = { "time_field", "domain", "value" };
|
||||
writer.buildTransformsAndWriteHeader(header);
|
||||
writer.buildTransforms(header);
|
||||
List<Transform> trs = writer.postDateTransforms;
|
||||
assertEquals(1, trs.size());
|
||||
|
||||
|
@ -275,6 +281,8 @@ public class AbstractDataToProcessWriterTests extends ESTestCase {
|
|||
AbstractDataToProcessWriter writer =
|
||||
new CsvDataToProcessWriter(true, autodetectProcess, dd.build(), ac, transforms, statusReporter, jobLogger);
|
||||
|
||||
writer.writeHeader();
|
||||
|
||||
Set<String> inputFields = new HashSet<>(writer.inputFields());
|
||||
|
||||
assertEquals(4, inputFields.size());
|
||||
|
@ -285,7 +293,7 @@ public class AbstractDataToProcessWriterTests extends ESTestCase {
|
|||
|
||||
String[] header = { "date", "time", "domain", "value" };
|
||||
|
||||
writer.buildTransformsAndWriteHeader(header);
|
||||
writer.buildTransforms(header);
|
||||
List<Transform> trs = writer.dateInputTransforms;
|
||||
assertEquals(1, trs.size());
|
||||
assertTrue(trs.get(0) instanceof Concat);
|
||||
|
@ -325,9 +333,11 @@ public class AbstractDataToProcessWriterTests extends ESTestCase {
|
|||
AbstractDataToProcessWriter writer =
|
||||
new CsvDataToProcessWriter(true, autodetectProcess, dd.build(), ac, transforms, statusReporter, jobLogger);
|
||||
|
||||
writer.writeHeader();
|
||||
|
||||
String[] header = { "datetime", "metric", "value" };
|
||||
|
||||
writer.buildTransformsAndWriteHeader(header);
|
||||
writer.buildTransforms(header);
|
||||
|
||||
// metricA is excluded
|
||||
String[] input = { "1", "metricA", "0" };
|
||||
|
@ -376,9 +386,11 @@ public class AbstractDataToProcessWriterTests extends ESTestCase {
|
|||
AbstractDataToProcessWriter writer =
|
||||
new CsvDataToProcessWriter(true, autodetectProcess, dd.build(), ac, transforms, statusReporter, jobLogger);
|
||||
|
||||
writer.writeHeader();
|
||||
|
||||
String[] header = { "date-somethingelse", "time", "type", "value" };
|
||||
|
||||
writer.buildTransformsAndWriteHeader(header);
|
||||
writer.buildTransforms(header);
|
||||
|
||||
// the date input transforms should be in this order
|
||||
List<Transform> trs = writer.dateInputTransforms;
|
||||
|
|
|
@ -88,6 +88,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
|
|||
public void testWrite_cancel() throws Exception {
|
||||
InputStream inputStream = endLessStream("time,metric,value\n", "1,,foo\n");
|
||||
CsvDataToProcessWriter writer = createWriter();
|
||||
writer.writeHeader();
|
||||
|
||||
AtomicBoolean cancel = new AtomicBoolean(false);
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
|
@ -119,7 +120,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("2,bar,2.0\n");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
CsvDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
@ -151,7 +152,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("2,,\n");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
CsvDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
@ -174,7 +175,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("2,bar,2.0\n");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
CsvDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
@ -199,7 +200,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
|
|||
|
||||
when(statusReporter.getLatestRecordTime()).thenReturn(new Date(5000L));
|
||||
CsvDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
@ -230,7 +231,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("\0");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
CsvDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
@ -264,7 +265,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("\0");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
CsvDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
@ -286,7 +287,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
|
|||
verify(statusReporter).finishReporting();
|
||||
}
|
||||
|
||||
public void testWrite_EmpytInput() throws IOException {
|
||||
public void testWrite_EmptyInput() throws IOException {
|
||||
AnalysisConfig.Builder builder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "value").build()));
|
||||
builder.setLatency(0L);
|
||||
analysisConfig = builder.build();
|
||||
|
@ -295,6 +296,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
|
|||
|
||||
InputStream inputStream = createInputStream("");
|
||||
CsvDataToProcessWriter writer = createWriter();
|
||||
writer.writeHeader();
|
||||
|
||||
DataCounts counts = writer.write(inputStream, () -> false);
|
||||
assertEquals(0L, counts.getInputBytes());
|
||||
|
@ -316,6 +318,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
|
|||
dataDescription.setTimeFormat("yyyy-MM-ddHH:mm:ssX");
|
||||
|
||||
CsvDataToProcessWriter writer = createWriter();
|
||||
writer.writeHeader();
|
||||
|
||||
StringBuilder input = new StringBuilder();
|
||||
input.append("date,time-of-day,metric,value\n");
|
||||
|
@ -361,7 +364,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("2,www,bar.com,2.0\n");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
CsvDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
@ -387,6 +390,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("2,bar\",2.0\n");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
CsvDataToProcessWriter writer = createWriter();
|
||||
writer.writeHeader();
|
||||
|
||||
SuperCsvException e = ESTestCase.expectThrows(SuperCsvException.class, () -> writer.write(inputStream, () -> false));
|
||||
// Expected line numbers are 2 and 10001, but SuperCSV may print the
|
||||
|
|
|
@ -67,7 +67,7 @@ public class DataWithTransformsToProcessWriterTests extends ESTestCase {
|
|||
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
AbstractDataToProcessWriter writer = createWriter(true);
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
|
||||
List<String[]> expectedRecords = new ArrayList<>();
|
||||
|
@ -90,7 +90,7 @@ public class DataWithTransformsToProcessWriterTests extends ESTestCase {
|
|||
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
AbstractDataToProcessWriter writer = createWriter(false);
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
|
||||
List<String[]> expectedRecords = new ArrayList<>();
|
||||
|
|
|
@ -83,6 +83,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
|
|||
public void testWrite_cancel() throws Exception {
|
||||
InputStream inputStream = endLessStream("", "{\"time\":1}");
|
||||
JsonDataToProcessWriter writer = createWriter();
|
||||
writer.writeHeader();
|
||||
|
||||
AtomicBoolean cancel = new AtomicBoolean(false);
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
|
@ -112,7 +113,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("{\"time\":\"2\", \"metric\":\"bar\", \"value\":\"2.0\"}");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
JsonDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
@ -134,7 +135,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("{\"time\":\"2\", \"metric\":\"bar\", \"value\":\"2.0\"}");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
JsonDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
@ -163,7 +164,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("{\"time\":\"2\", \"metric\":\"bar\", \"value\":\"2.0\"}");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
JsonDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
|
||||
List<String[]> expectedRecords = new ArrayList<>();
|
||||
|
@ -192,7 +193,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("{\"time\":\"3\", \"value\":\"3.0\"}");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
JsonDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
@ -221,7 +222,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("{\"time\":\"3\", \"nested\":{\"value\":\"3.0\"}}");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
JsonDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
@ -247,6 +248,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("{\"time");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
JsonDataToProcessWriter writer = createWriter();
|
||||
writer.writeHeader();
|
||||
|
||||
ESTestCase.expectThrows(ElasticsearchParseException.class, () -> writer.write(inputStream, () -> false));
|
||||
}
|
||||
|
@ -262,7 +264,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("{\"time\":\"2\", \"array\":[], \"value\":\"2.0\"}");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
JsonDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
@ -291,7 +293,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
|
|||
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
JsonDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
@ -327,6 +329,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
|
|||
dataDescription.setTimeFormat("yyyy-MM-ddHH:mm:ssX");
|
||||
|
||||
JsonDataToProcessWriter writer = createWriter();
|
||||
writer.writeHeader();
|
||||
|
||||
StringBuilder input = new StringBuilder();
|
||||
input.append("{\"date\":\"1970-01-01\", \"time-of-day\":\"00:00:01Z\", \"value\":\"5.0\"}");
|
||||
|
@ -369,7 +372,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("{\"time\":\"2\", \"dns1\":\"www\", \"dns2\":\"bar.com\", \"value\":\"2.0\"}");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
JsonDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
||||
|
|
|
@ -86,6 +86,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase {
|
|||
|
||||
InputStream inputStream = endLessStream("", "2015-04-29 10:00:00Z this is a message\n");
|
||||
SingleLineDataToProcessWriter writer = createWriter();
|
||||
writer.writeHeader();
|
||||
|
||||
AtomicBoolean cancel = new AtomicBoolean(false);
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
|
@ -122,7 +123,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("2015-04-29 12:00:00Z This is message 3\r\n");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
SingleLineDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).getLatestRecordTime();
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
@ -159,7 +160,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("2015-04-29 12:00:00Z This is message 3\n");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
SingleLineDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).getLatestRecordTime();
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
|
@ -185,7 +186,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase {
|
|||
input.append("2015-04-29 10:00:00Z This is message 1\n");
|
||||
InputStream inputStream = createInputStream(input.toString());
|
||||
SingleLineDataToProcessWriter writer = createWriter();
|
||||
|
||||
writer.writeHeader();
|
||||
writer.write(inputStream, () -> false);
|
||||
verify(statusReporter, times(1)).startNewIncrementalCount();
|
||||
verify(statusReporter, times(1)).setAnalysedFieldsPerRecord(1);
|
||||
|
|
|
@ -70,6 +70,23 @@ setup:
|
|||
- match: { _source.earliest_record_timestamp: 1403481600000}
|
||||
- match: { _source.latest_record_timestamp: 1403481700000}
|
||||
|
||||
---
|
||||
"Test flush and close job WITHOUT sending any data":
|
||||
- do:
|
||||
xpack.prelert.flush_job:
|
||||
job_id: farequote
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
xpack.prelert.close_job:
|
||||
job_id: farequote
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
xpack.prelert.get_jobs_stats:
|
||||
job_id: farequote
|
||||
- match: { jobs.0.status: "CLOSED" }
|
||||
|
||||
---
|
||||
"Test POST data with invalid parameters":
|
||||
- do:
|
||||
|
|
Loading…
Reference in New Issue