Move post data api over to task api.

Also made post data api cancellable.

Original commit: elastic/x-pack-elasticsearch@55360609de
This commit is contained in:
Martijn van Groningen 2016-11-30 23:12:01 +01:00
parent a82dc82439
commit 1e5a12fc6a
19 changed files with 239 additions and 71 deletions

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
@ -25,6 +24,9 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
@ -124,6 +126,14 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
}
}
static class PostDataTask extends CancellableTask {
PostDataTask(long id, String type, String action, TaskId parentTaskId, String jobId) {
super(id, type, action, jobId + "_post_data", parentTaskId);
}
}
public static class Request extends ActionRequest {
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignoreDowntime");
@ -178,6 +188,11 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
this.content = content;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new PostDataTask(id, type, action, parentTaskId, jobId);
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -241,17 +256,24 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
}
@Override
protected final void doExecute(Request request, ActionListener<Response> listener) {
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
PostDataTask postDataTask = (PostDataTask) task;
TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build();
DataLoadParams params = new DataLoadParams(timeRange, request.isIgnoreDowntime());
threadPool.executor(PrelertPlugin.THREAD_POOL_NAME).execute(() -> {
try {
DataCounts dataCounts = processManager.processData(request.getJobId(), request.content.streamInput(), params);
DataCounts dataCounts = processManager.processData(request.getJobId(), request.content.streamInput(), params,
postDataTask::isCancelled);
listener.onResponse(new Response(dataCounts));
} catch (IOException | ElasticsearchException e) {
} catch (Exception e) {
listener.onFailure(e);
}
});
}
@Override
protected final void doExecute(Request request, ActionListener<Response> listener) {
throw new UnsupportedOperationException("the task parameter is required");
}
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadPar
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
import java.io.InputStream;
import java.util.function.Supplier;
public interface DataProcessor {
@ -27,12 +28,13 @@ public interface DataProcessor {
* <li>If a high proportion of the records chronologically out of order</li>
* </ol>
*
* @param jobId the jobId
* @param input Data input stream
* @param params Data processing parameters
* @param jobId the jobId
* @param input Data input stream
* @param params Data processing parameters
* @param cancelled Whether the data processing has been cancelled
* @return Count of records, fields, bytes, etc written
*/
DataCounts processData(String jobId, InputStream input, DataLoadParams params);
DataCounts processData(String jobId, InputStream input, DataLoadParams params, Supplier<Boolean> cancelled);
/**
* Flush the running job, ensuring that the native process has had the

View File

@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
public class AutodetectProcessManager extends AbstractComponent implements DataProcessor {
@ -93,7 +94,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
}
@Override
public DataCounts processData(String jobId, InputStream input, DataLoadParams params) {
public DataCounts processData(String jobId, InputStream input, DataLoadParams params, Supplier<Boolean> cancelled) {
Allocation allocation = jobManager.getJobAllocation(jobId);
if (allocation.getStatus().isAnyOf(JobStatus.PAUSING, JobStatus.PAUSED)) {
return new DataCounts(jobId);
@ -105,7 +106,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
return c;
});
try {
return communicator.writeToJob(input, params);
return communicator.writeToJob(input, params, cancelled);
// TODO check for errors from autodetect
} catch (IOException e) {
String msg = String.format(Locale.ROOT, "Exception writing to process for job %s", jobId);

View File

@ -73,13 +73,13 @@ public class AutodetectCommunicator implements Closeable {
job.getSchedulerConfig(), new TransformConfigs(job.getTransforms()) , statusReporter, LOGGER);
}
public DataCounts writeToJob(InputStream inputStream, DataLoadParams params) throws IOException {
public DataCounts writeToJob(InputStream inputStream, DataLoadParams params, Supplier<Boolean> cancelled) throws IOException {
return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, jobId), () -> {
if (params.isResettingBuckets()) {
autodetectProcess.writeResetBucketsControlMessage(params);
}
CountingInputStream countingStream = new CountingInputStream(inputStream, statusReporter);
DataCounts results = autoDetectWriter.write(countingStream);
DataCounts results = autoDetectWriter.write(countingStream, cancelled);
autoDetectWriter.flush();
return results;
}, false);

View File

@ -21,9 +21,11 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.DataDescription.DataFormat;
@ -94,7 +96,8 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
/**
* Create the transforms. This must be called before {@linkplain #write(java.io.InputStream)}
* Create the transforms. This must be called before
* {@linkplain DataToProcessWriter#write(java.io.InputStream, java.util.function.Supplier)}
* even if no transforms are configured as it creates the
* date transform and sets up the field mappings.<br>
* <p>
@ -204,6 +207,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
* First all the transforms whose outputs the Date transform relies
* on are executed then the date transform then the remaining transforms.
*
* @param cancelled Determines whether the process writting has been cancelled
* @param input The record the transforms should read their input from. The contents should
* align with the header parameter passed to {@linkplain #buildTransformsAndWriteHeader(String[])}
* @param output The record that will be written to the length encoded writer.
@ -211,8 +215,12 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
* the size of the map returned by {@linkplain #outputFieldIndexes()}
* @param numberOfFieldsRead The total number read not just those included in the analysis
*/
protected boolean applyTransformsAndWrite(String[] input, String[] output, long numberOfFieldsRead)
protected boolean applyTransformsAndWrite(Supplier<Boolean> cancelled, String[] input, String[] output, long numberOfFieldsRead)
throws IOException {
if (cancelled.get()) {
throw new TaskCancelledException("cancelled");
}
readWriteArea[TransformFactory.INPUT_ARRAY_INDEX] = input;
readWriteArea[TransformFactory.OUTPUT_ARRAY_INDEX] = output;
Arrays.fill(readWriteArea[TransformFactory.SCRATCH_ARRAY_INDEX], "");

View File

@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Supplier;
/**
* A writer for transforming and piping CSV data from an
@ -62,7 +63,7 @@ class CsvDataToProcessWriter extends AbstractDataToProcessWriter {
* header a exception is thrown
*/
@Override
public DataCounts write(InputStream inputStream) throws IOException {
public DataCounts write(InputStream inputStream, Supplier<Boolean> cancelled) throws IOException {
CsvPreference csvPref = new CsvPreference.Builder(
dataDescription.getQuoteCharacter(),
dataDescription.getFieldDelimiter(),
@ -117,7 +118,7 @@ class CsvDataToProcessWriter extends AbstractDataToProcessWriter {
}
fillRecordFromLine(line, inputRecord);
applyTransformsAndWrite(inputRecord, record, inputFieldCount);
applyTransformsAndWrite(cancelled, inputRecord, record, inputFieldCount);
}
// This function can throw

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.prelert.job.process.autodetect.writer;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.Supplier;
import org.elasticsearch.xpack.prelert.job.DataCounts;
@ -24,7 +25,7 @@ public interface DataToProcessWriter {
*
* @return Counts of the records processed, bytes read etc
*/
DataCounts write(InputStream inputStream) throws IOException;
DataCounts write(InputStream inputStream, Supplier<Boolean> cancelled) throws IOException;
/**
* Flush the outputstream

View File

@ -12,6 +12,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
@ -59,11 +60,11 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
* timeField is missing from the JOSN inputIndex an exception is thrown
*/
@Override
public DataCounts write(InputStream inputStream) throws IOException {
public DataCounts write(InputStream inputStream, Supplier<Boolean> cancelled) throws IOException {
statusReporter.startNewIncrementalCount();
try (JsonParser parser = new JsonFactory().createParser(inputStream)) {
writeJson(parser);
writeJson(parser, cancelled);
// this line can throw and will be propagated
statusReporter.finishReporting();
@ -72,7 +73,7 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
return statusReporter.incrementalStats();
}
private void writeJson(JsonParser parser) throws IOException {
private void writeJson(JsonParser parser, Supplier<Boolean> cancelled) throws IOException {
Collection<String> analysisFields = inputFields();
buildTransformsAndWriteHeader(analysisFields.toArray(new String[0]));
@ -101,7 +102,7 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
record[inOut.outputIndex] = (field == null) ? "" : field;
}
applyTransformsAndWrite(input, record, inputFieldCount);
applyTransformsAndWrite(cancelled, input, record, inputFieldCount);
inputFieldCount = recordReader.read(input, gotFields);
}

View File

@ -13,6 +13,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
@ -43,7 +44,7 @@ public class SingleLineDataToProcessWriter extends AbstractDataToProcessWriter {
}
@Override
public DataCounts write(InputStream inputStream) throws IOException {
public DataCounts write(InputStream inputStream, Supplier<Boolean> cancelled) throws IOException {
statusReporter.startNewIncrementalCount();
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
@ -56,7 +57,7 @@ public class SingleLineDataToProcessWriter extends AbstractDataToProcessWriter {
for (String line = bufferedReader.readLine(); line != null;
line = bufferedReader.readLine()) {
Arrays.fill(record, "");
applyTransformsAndWrite(new String[]{line}, record, 1);
applyTransformsAndWrite(cancelled, new String[]{line}, record, 1);
}
statusReporter.finishReporting();
}

View File

@ -133,7 +133,7 @@ class ScheduledJob {
if (extractedData.isPresent()) {
DataCounts counts;
try {
counts = dataProcessor.processData(jobId, extractedData.get(), DATA_LOAD_PARAMS);
counts = dataProcessor.processData(jobId, extractedData.get(), DATA_LOAD_PARAMS, () -> false);
} catch (Exception e) {
error = new AnalysisProblemException(e);
break;

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.data.DataProcessor;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
@ -37,6 +38,7 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import static org.elasticsearch.mock.orig.Mockito.doThrow;
import static org.elasticsearch.mock.orig.Mockito.times;
@ -51,7 +53,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
/**
* Calling the {@link AutodetectProcessManager#processData(String, InputStream, DataLoadParams)}
* Calling the {@link DataProcessor#processData(String, InputStream, DataLoadParams, java.util.function.Supplier)}
* method causes an AutodetectCommunicator to be created on demand. Most of these tests have to
* do that before they can assert other things
*/
@ -77,7 +79,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
assertEquals(0, manager.numberOfRunningJobs());
DataLoadParams params = new DataLoadParams(TimeRange.builder().build());
manager.processData("foo", createInputStream(""), params);
manager.processData("foo", createInputStream(""), params, () -> false);
assertEquals(1, manager.numberOfRunningJobs());
}
@ -87,10 +89,11 @@ public class AutodetectProcessManagerTests extends ESTestCase {
DataLoadParams params = mock(DataLoadParams.class);
InputStream inputStream = createInputStream("");
doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream, params);
Supplier<Boolean> cancellable = () -> false;
doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream, params, cancellable);
ESTestCase.expectThrows(ElasticsearchException.class,
() -> manager.processData("foo", inputStream, params));
() -> manager.processData("foo", inputStream, params, cancellable));
}
public void testCloseJob() {
@ -99,7 +102,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createManager(communicator);
assertEquals(0, manager.numberOfRunningJobs());
manager.processData("foo", createInputStream(""), mock(DataLoadParams.class));
manager.processData("foo", createInputStream(""), mock(DataLoadParams.class), () -> false);
// job is created
assertEquals(1, manager.numberOfRunningJobs());
@ -111,10 +114,11 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManager(communicator);
Supplier<Boolean> cancellable = () -> false;
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), true);
InputStream inputStream = createInputStream("");
manager.processData("foo", inputStream, params);
verify(communicator).writeToJob(inputStream, params);
manager.processData("foo", inputStream, params, cancellable);
verify(communicator).writeToJob(inputStream, params, cancellable);
}
public void testFlush() throws IOException {
@ -123,7 +127,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
InputStream inputStream = createInputStream("");
manager.processData("foo", inputStream, mock(DataLoadParams.class));
manager.processData("foo", inputStream, mock(DataLoadParams.class), () -> false);
InterimResultsParams params = InterimResultsParams.builder().build();
manager.flushJob("foo", params);
@ -154,7 +158,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createManager(communicator);
assertFalse(manager.jobHasActiveAutodetectProcess("foo"));
manager.processData("foo", createInputStream(""), mock(DataLoadParams.class));
manager.processData("foo", createInputStream(""), mock(DataLoadParams.class), () -> false);
assertTrue(manager.jobHasActiveAutodetectProcess("foo"));
assertFalse(manager.jobHasActiveAutodetectProcess("bar"));
@ -170,7 +174,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
givenAllocationWithStatus(JobStatus.PAUSING);
InputStream inputStream = createInputStream("");
DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class));
DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class), () -> false);
assertThat(dataCounts, equalTo(new DataCounts("foo")));
}
@ -182,7 +186,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
givenAllocationWithStatus(JobStatus.PAUSED);
AutodetectProcessManager manager = createManager(communicator);
InputStream inputStream = createInputStream("");
DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class));
DataCounts dataCounts = manager.processData("foo", inputStream, mock(DataLoadParams.class), () -> false);
assertThat(dataCounts, equalTo(new DataCounts("foo")));
}
@ -227,7 +231,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private AutodetectProcessManager createManagerAndCallProcessData(AutodetectCommunicator communicator, String jobId) {
AutodetectProcessManager manager = createManager(communicator);
manager.processData(jobId, createInputStream(""), mock(DataLoadParams.class));
manager.processData(jobId, createInputStream(""), mock(DataLoadParams.class), () -> false);
return manager;
}

View File

@ -42,7 +42,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build());
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
communicator.writeToJob(new ByteArrayInputStream(new byte[0]), params);
communicator.writeToJob(new ByteArrayInputStream(new byte[0]), params, () -> false);
Mockito.verify(process).writeResetBucketsControlMessage(params);
}
}
@ -143,10 +143,11 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class));
communicator.inUse.set(new CountDownLatch(1));
expectThrows(ElasticsearchStatusException.class, () -> communicator.writeToJob(in, mock(DataLoadParams.class)));
expectThrows(ElasticsearchStatusException.class,
() -> communicator.writeToJob(in, mock(DataLoadParams.class), () -> false));
communicator.inUse.set(null);
communicator.writeToJob(in, mock(DataLoadParams.class));
communicator.writeToJob(in, mock(DataLoadParams.class), () -> false);
}
public void testFlushInUse() throws IOException {

View File

@ -333,7 +333,7 @@ public class AbstractDataToProcessWriterTests extends ESTestCase {
String[] input = { "1", "metricA", "0" };
String[] output = new String[3];
assertFalse(writer.applyTransformsAndWrite(input, output, 3));
assertFalse(writer.applyTransformsAndWrite(() -> false, input, output, 3));
verify(autodetectProcess, never()).writeRecord(output);
verify(statusReporter, never()).reportRecordWritten(anyLong(), anyLong());
@ -344,7 +344,7 @@ public class AbstractDataToProcessWriterTests extends ESTestCase {
// this is ok
input = new String[] { "2", "metricB", "0" };
String[] expectedOutput = { "2", null, null };
assertTrue(writer.applyTransformsAndWrite(input, output, 3));
assertTrue(writer.applyTransformsAndWrite(() -> false, input, output, 3));
verify(autodetectProcess, times(1)).writeRecord(expectedOutput);
verify(statusReporter, times(1)).reportRecordWritten(3, 2000);

View File

@ -5,8 +5,10 @@
*/
package org.elasticsearch.xpack.prelert.job.process.autodetect.writer;
import static org.elasticsearch.xpack.prelert.job.process.autodetect.writer.JsonDataToProcessWriterTests.endLessStream;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ -21,8 +23,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess;
@ -80,6 +86,32 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector)).build();
}
public void testWrite_cancel() throws Exception {
InputStream inputStream = endLessStream("time,metric,value\n", "1,,foo\n");
CsvDataToProcessWriter writer = createWriter();
AtomicBoolean cancel = new AtomicBoolean(false);
AtomicReference<Exception> exception = new AtomicReference<>();
Thread t = new Thread(() -> {
try {
writer.write(inputStream, cancel::get);
} catch (Exception e) {
exception.set(e);
}
});
t.start();
try {
assertBusy(() -> verify(statusReporter, atLeastOnce()).reportRecordWritten(anyLong(), anyLong()));
} finally {
cancel.set(true);
t.join();
}
assertNotNull(exception.get());
assertEquals(TaskCancelledException.class, exception.get().getClass());
assertEquals("cancelled", exception.get().getMessage());
}
public void testWrite_GivenTimeFormatIsEpochAndDataIsValid()
throws IOException {
StringBuilder input = new StringBuilder();
@ -89,7 +121,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -121,7 +153,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -144,7 +176,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -169,7 +201,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
when(statusReporter.getLatestRecordTime()).thenReturn(new Date(5000L));
CsvDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -200,7 +232,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -234,7 +266,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -265,7 +297,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream("");
CsvDataToProcessWriter writer = createWriter();
DataCounts counts = writer.write(inputStream);
DataCounts counts = writer.write(inputStream, () -> false);
assertEquals(0L, counts.getInputBytes());
assertEquals(0L, counts.getInputRecordCount());
}
@ -292,7 +324,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
input.append("1970-01-01,00:00:02Z,foo,6.0\n");
InputStream inputStream = createInputStream(input.toString());
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -331,7 +363,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -357,7 +389,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
SuperCsvException e = ESTestCase.expectThrows(SuperCsvException.class, () -> writer.write(inputStream));
SuperCsvException e = ESTestCase.expectThrows(SuperCsvException.class, () -> writer.write(inputStream, () -> false));
// Expected line numbers are 2 and 10001, but SuperCSV may print the
// numbers using a different locale's digit characters
assertTrue(e.getMessage(), e.getMessage().matches(

View File

@ -69,7 +69,7 @@ public class DataWithTransformsToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
AbstractDataToProcessWriter writer = createWriter(true);
writer.write(inputStream);
writer.write(inputStream, () -> false);
List<String[]> expectedRecords = new ArrayList<>();
// The final field is the control field
@ -92,7 +92,7 @@ public class DataWithTransformsToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
AbstractDataToProcessWriter writer = createWriter(false);
writer.write(inputStream);
writer.write(inputStream, () -> false);
List<String[]> expectedRecords = new ArrayList<>();
// The final field is the control field

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.prelert.job.process.autodetect.writer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ -19,9 +20,12 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess;
import org.junit.Before;
@ -77,6 +81,32 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector)).build();
}
public void testWrite_cancel() throws Exception {
InputStream inputStream = endLessStream("", "{\"time\":1}");
JsonDataToProcessWriter writer = createWriter();
AtomicBoolean cancel = new AtomicBoolean(false);
AtomicReference<Exception> exception = new AtomicReference<>();
Thread t = new Thread(() -> {
try {
writer.write(inputStream, cancel::get);
} catch (Exception e) {
exception.set(e);
}
});
t.start();
try {
assertBusy(() -> verify(statusReporter, atLeastOnce()).reportRecordWritten(anyLong(), anyLong()));
} finally {
cancel.set(true);
t.join();
}
assertNotNull(exception.get());
assertEquals(TaskCancelledException.class, exception.get().getClass());
assertEquals("cancelled", exception.get().getMessage());
}
public void testWrite_GivenTimeFormatIsEpochAndDataIsValid() throws Exception {
StringBuilder input = new StringBuilder();
input.append("{\"time\":\"1\", \"metric\":\"foo\", \"value\":\"1.0\"}");
@ -84,7 +114,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -106,7 +136,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -135,7 +165,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
List<String[]> expectedRecords = new ArrayList<>();
// The final field is the control field
@ -164,7 +194,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -193,7 +223,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -219,7 +249,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
ESTestCase.expectThrows(ElasticsearchParseException.class, () -> writer.write(inputStream));
ESTestCase.expectThrows(ElasticsearchParseException.class, () -> writer.write(inputStream, () -> false));
}
public void testWrite_GivenJsonWithArrayField()
@ -234,7 +264,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -263,7 +293,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -304,7 +334,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
input.append("{\"date\":\"1970-01-01\", \"time-of-day\":\"00:00:02Z\", \"value\":\"6.0\"}");
InputStream inputStream = createInputStream(input.toString());
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -341,7 +371,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -370,4 +400,31 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
assertArrayEquals(expectedRecords.get(i), writtenRecords.get(i));
}
}
static InputStream endLessStream(String firstLine, String repeatLine) {
return new InputStream() {
int pos = 0;
boolean firstLineRead = false;
final byte[] first = firstLine.getBytes(StandardCharsets.UTF_8);
final byte[] repeat = repeatLine.getBytes(StandardCharsets.UTF_8);
@Override
public int read() throws IOException {
if (firstLineRead == false) {
if (pos == first.length) {
pos = 0;
firstLineRead = true;
} else {
return first[pos++];
}
}
if (pos == repeat.length) {
pos = 0;
}
return repeat[pos++];
}
};
}
}

View File

@ -5,7 +5,10 @@
*/
package org.elasticsearch.xpack.prelert.job.process.autodetect.writer;
import static org.elasticsearch.xpack.prelert.job.process.autodetect.writer.JsonDataToProcessWriterTests.endLessStream;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -18,8 +21,11 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess;
import org.junit.Before;
@ -71,6 +77,38 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase {
transformConfigs = new ArrayList<>();
}
public void testWrite_cancel() throws Exception {
TransformConfig transformConfig = new TransformConfig("extract");
transformConfig.setInputs(Arrays.asList("raw"));
transformConfig.setOutputs(Arrays.asList("time", "message"));
transformConfig.setArguments(Arrays.asList("(.{20}) (.*)"));
transformConfigs.add(transformConfig);
InputStream inputStream = endLessStream("", "2015-04-29 10:00:00Z this is a message\n");
SingleLineDataToProcessWriter writer = createWriter();
AtomicBoolean cancel = new AtomicBoolean(false);
AtomicReference<Exception> exception = new AtomicReference<>();
Thread t = new Thread(() -> {
try {
writer.write(inputStream, cancel::get);
} catch (Exception e) {
exception.set(e);
}
});
t.start();
try {
assertBusy(() -> verify(statusReporter, atLeastOnce()).reportRecordWritten(anyLong(), anyLong()));
} finally {
cancel.set(true);
t.join();
}
assertNotNull(exception.get());
assertEquals(TaskCancelledException.class, exception.get().getClass());
assertEquals("cancelled", exception.get().getMessage());
}
public void testWrite_GivenDataIsValid() throws Exception {
TransformConfig transformConfig = new TransformConfig("extract");
transformConfig.setInputs(Arrays.asList("raw"));
@ -85,7 +123,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
SingleLineDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).getLatestRecordTime();
verify(statusReporter, times(1)).startNewIncrementalCount();
verify(statusReporter, times(1)).setAnalysedFieldsPerRecord(1);
@ -122,7 +160,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
SingleLineDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).getLatestRecordTime();
verify(statusReporter, times(1)).startNewIncrementalCount();
verify(statusReporter, times(1)).setAnalysedFieldsPerRecord(1);
@ -148,7 +186,7 @@ public class SingleLineDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
SingleLineDataToProcessWriter writer = createWriter();
writer.write(inputStream);
writer.write(inputStream, () -> false);
verify(statusReporter, times(1)).startNewIncrementalCount();
verify(statusReporter, times(1)).setAnalysedFieldsPerRecord(1);
verify(statusReporter, times(1)).reportDateParseError(1);

View File

@ -35,7 +35,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
@ -104,7 +103,7 @@ public class ScheduledJobServiceTests extends ESTestCase {
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in));
when(dataProcessor.processData(anyString(), eq(in), any())).thenReturn(dataCounts);
when(dataProcessor.processData(anyString(), eq(in), any(), any())).thenReturn(dataCounts);
scheduledJobService.start(builder.build(), allocation);
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
@ -126,7 +125,7 @@ public class ScheduledJobServiceTests extends ESTestCase {
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in));
when(dataProcessor.processData(anyString(), eq(in), any())).thenReturn(dataCounts);
when(dataProcessor.processData(anyString(), eq(in), any(), any())).thenReturn(dataCounts);
scheduledJobService.start(builder.build(), allocation);
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());

View File

@ -49,7 +49,7 @@ public class ScheduledJobTests extends ESTestCase {
InputStream inputStream = mock(InputStream.class);
when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
DataCounts dataCounts = new DataCounts("_job_id", 1, 0, 0, 0, 0, 0, 0, new Date(0), new Date(0));
when(dataProcessor.processData(eq("_job_id"), same(inputStream), any())).thenReturn(dataCounts);
when(dataProcessor.processData(eq("_job_id"), same(inputStream), any(), any())).thenReturn(dataCounts);
}
public void testLookBackRunWithEndTime() throws Exception {
@ -141,7 +141,7 @@ public class ScheduledJobTests extends ESTestCase {
public void testAnalysisProblem() throws Exception {
dataProcessor = mock(DataProcessor.class);
when(dataProcessor.processData(eq("_job_id"), any(), any())).thenThrow(new RuntimeException());
when(dataProcessor.processData(eq("_job_id"), any(), any(), any())).thenThrow(new RuntimeException());
ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1);
SchedulerState schedulerState = new SchedulerState(JobSchedulerStatus.STARTED, 0L, 1000L);