[ML] Wait for dataCounts to be persisted (elastic/x-pack-elasticsearch#952)

Original commit: elastic/x-pack-elasticsearch@48ca4d7363
This commit is contained in:
David Kyle 2017-04-05 08:45:01 +01:00 committed by GitHub
parent 843a0d8b3f
commit 7b45460951
11 changed files with 131 additions and 111 deletions

View File

@ -261,13 +261,13 @@ public class DataCountsReporter extends AbstractComponent {
/**
* Report the counts now regardless of whether or not we are at a reporting boundary.
*/
public void finishReporting() {
public void finishReporting(ActionListener<Boolean> listener) {
Date now = new Date();
incrementalRecordStats.setLastDataTimeStamp(now);
totalRecordStats.setLastDataTimeStamp(now);
diagnostics.flush();
retrieveDiagnosticsIntermediateResults();
dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats(), new LoggingActionListener());
dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats(), listener);
}
/**
@ -368,19 +368,4 @@ public class DataCountsReporter extends AbstractComponent {
diagnostics.resetCounts();
}
/**
* Log success/error
*/
private class LoggingActionListener implements ActionListener<Boolean> {
@Override
public void onResponse(Boolean aBoolean) {
logger.trace("[{}] Persisted DataCounts", job.getId());
}
@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("[{}] Error persisting DataCounts stats", job.getId()), e);
}
}
}

View File

@ -34,10 +34,14 @@ import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -78,18 +82,34 @@ public class AutodetectCommunicator implements Closeable {
}
public void writeToJob(InputStream inputStream, XContentType xContentType,
DataLoadParams params, BiConsumer<DataCounts, Exception> handler) throws IOException {
DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
submitOperation(() -> {
if (params.isResettingBuckets()) {
autodetectProcess.writeResetBucketsControlMessage(params);
}
CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter);
CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter);
DataToProcessWriter autoDetectWriter = createProcessWriter(params.getDataDescription());
DataCounts results = autoDetectWriter.write(countingStream, xContentType);
autoDetectWriter.flush();
return results;
}, handler);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<DataCounts> dataCountsAtomicReference = new AtomicReference<>();
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
autoDetectWriter.write(countingStream, xContentType, (dataCounts, e) -> {
dataCountsAtomicReference.set(dataCounts);
exceptionAtomicReference.set(e);
latch.countDown();
});
latch.await();
autoDetectWriter.flushStream();
if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get();
} else {
return dataCountsAtomicReference.get();
}
},
handler);
}
@Override
@ -123,7 +143,7 @@ public class AutodetectCommunicator implements Closeable {
}
public void writeUpdateProcessMessage(ModelPlotConfig config, List<JobUpdate.DetectorUpdate> updates,
BiConsumer<Void, Exception> handler) throws IOException {
BiConsumer<Void, Exception> handler) {
submitOperation(() -> {
if (config != null) {
autodetectProcess.writeUpdateModelPlotMessage(config);
@ -139,7 +159,7 @@ public class AutodetectCommunicator implements Closeable {
}, handler);
}
public void flushJob(InterimResultsParams params, BiConsumer<Void, Exception> handler) throws IOException {
public void flushJob(InterimResultsParams params, BiConsumer<Void, Exception> handler) {
submitOperation(() -> {
String flushId = autodetectProcess.flushJob(params);
waitFlushToCompletion(flushId);
@ -147,8 +167,8 @@ public class AutodetectCommunicator implements Closeable {
}, handler);
}
private void waitFlushToCompletion(String flushId) throws IOException {
LOGGER.info("[{}] waiting for flush", job.getId());
private void waitFlushToCompletion(String flushId) {
LOGGER.debug("[{}] waiting for flush", job.getId());
try {
boolean isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
@ -191,10 +211,17 @@ public class AutodetectCommunicator implements Closeable {
return dataCountsReporter.runningTotalStats();
}
private <T> void submitOperation(CheckedSupplier<T, IOException> operation, BiConsumer<T, Exception> handler) throws IOException {
private <T> void submitOperation(CheckedSupplier<T, Exception> operation, BiConsumer<T, Exception> handler) {
autodetectWorkerExecutor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (e.getCause() instanceof TimeoutException) {
LOGGER.warn("Connection to process was dropped due to a timeout - if you are feeding this job from a connector it " +
"may be that your connector stalled for too long", e.getCause());
} else {
LOGGER.error(new ParameterizedMessage("[{}] Unexpected exception writing to process", job.getId()), e);
}
handler.accept(null, e);
}
@ -205,5 +232,4 @@ public class AutodetectCommunicator implements Closeable {
}
});
}
}

View File

@ -144,24 +144,12 @@ public class AutodetectProcessManager extends AbstractComponent {
* @param handler Delegate error or datacount results (Count of records, fields, bytes, etc written)
*/
public void processData(String jobId, InputStream input, XContentType xContentType,
DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
throw new IllegalStateException("[" + jobId + "] Cannot process data: no active autodetect process for job");
}
try {
communicator.writeToJob(input, xContentType, params, handler);
// TODO check for errors from autodetect
} catch (IOException e) {
String msg = String.format(Locale.ROOT, "Exception writing to process for job %s", jobId);
if (e.getCause() instanceof TimeoutException) {
logger.warn("Connection to process was dropped due to a timeout - if you are feeding this job from a connector it " +
"may be that your connector stalled for too long", e.getCause());
} else {
logger.error("Unexpected exception", e);
}
throw ExceptionsHelper.serverError(msg, e);
}
communicator.writeToJob(input, xContentType, params, handler);
}
/**
@ -181,26 +169,20 @@ public class AutodetectProcessManager extends AbstractComponent {
logger.debug(message);
throw new IllegalArgumentException(message);
}
try {
communicator.flushJob(params, (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId);
logger.error(msg);
handler.accept(ExceptionsHelper.serverError(msg, e));
}
});
// TODO check for errors from autodetect
} catch (IOException ioe) {
String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId);
logger.error(msg);
throw ExceptionsHelper.serverError(msg, ioe);
}
communicator.flushJob(params, (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId);
logger.error(msg);
handler.accept(ExceptionsHelper.serverError(msg, e));
}
});
}
public void writeUpdateProcessMessage(String jobId, List<JobUpdate.DetectorUpdate> updates, ModelPlotConfig config,
Consumer<Exception> handler) throws IOException {
Consumer<Exception> handler) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) {
logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId);
@ -214,7 +196,6 @@ public class AutodetectProcessManager extends AbstractComponent {
handler.accept(e);
}
});
// TODO check for errors from autodetects
}
public void openJob(String jobId, JobTask jobTask, boolean ignoreDowntime, Consumer<Exception> handler) {

View File

@ -6,12 +6,15 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -25,6 +28,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
public abstract class AbstractDataToProcessWriter implements DataToProcessWriter {
@ -77,7 +81,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
/**
* Set up the field index mappings. This must be called before
* {@linkplain DataToProcessWriter#write(java.io.InputStream, org.elasticsearch.common.xcontent.XContentType)}.
* {@linkplain DataToProcessWriter#write(InputStream, XContentType, BiConsumer)}
* <p>
* Finds the required input indexes in the <code>header</code> and sets the
* mappings to the corresponding output indexes.
@ -162,7 +166,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
}
@Override
public void flush() throws IOException {
public void flushStream() throws IOException {
autodetectProcess.flushStream();
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
@ -25,6 +26,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.BiConsumer;
/**
* A writer for transforming and piping CSV data from an
@ -66,7 +68,7 @@ class CsvDataToProcessWriter extends AbstractDataToProcessWriter {
* header a exception is thrown
*/
@Override
public DataCounts write(InputStream inputStream, XContentType xContentType) throws IOException {
public void write(InputStream inputStream, XContentType xContentType, BiConsumer<DataCounts, Exception> handler) throws IOException {
CsvPreference csvPref = new CsvPreference.Builder(
dataDescription.getQuoteCharacter(),
dataDescription.getFieldDelimiter(),
@ -78,7 +80,8 @@ class CsvDataToProcessWriter extends AbstractDataToProcessWriter {
try (CsvListReader csvReader = new CsvListReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8), csvPref)) {
String[] header = csvReader.getHeader(true);
if (header == null) { // null if EoF
return dataCountsReporter.incrementalStats();
handler.accept(dataCountsReporter.incrementalStats(), null);
return;
}
long inputFieldCount = Math.max(header.length - 1, 0); // time field doesn't count
@ -124,10 +127,11 @@ class CsvDataToProcessWriter extends AbstractDataToProcessWriter {
}
// This function can throw
dataCountsReporter.finishReporting();
dataCountsReporter.finishReporting(ActionListener.wrap(
response -> handler.accept(dataCountsReporter.incrementalStats(), null),
e -> handler.accept(null, e)
));
}
return dataCountsReporter.incrementalStats();
}
private static void fillRecordFromLine(List<String> line, String[] record) {

View File

@ -10,10 +10,10 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.BiConsumer;
/**
* A writer for transforming and piping data from an
* inputstream to outputstream as the process expects.
* Interface defining writers to a {@link org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess}
*/
public interface DataToProcessWriter {
@ -25,18 +25,15 @@ public interface DataToProcessWriter {
void writeHeader() throws IOException;
/**
* Reads the inputIndex, transform to length encoded values and pipe
* to the OutputStream.
* Write the contents of <code>inputStream</code>.
* If any of the fields in <code>analysisFields</code> or the
* <code>DataDescription</code>s timeField is missing from the CSV header
* a <code>MissingFieldException</code> is thrown
*
* @return Counts of the records processed, bytes read etc
*/
DataCounts write(InputStream inputStream, XContentType xContentType) throws IOException;
void write(InputStream inputStream, XContentType xContentType, BiConsumer<DataCounts, Exception> handler) throws IOException;
/**
* Flush the outputstream
*/
void flush() throws IOException;
void flushStream() throws IOException;
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -22,6 +23,7 @@ import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.function.BiConsumer;
/**
* A writer for transforming and piping JSON data from an
@ -52,7 +54,8 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
* timeField is missing from the JOSN inputIndex an exception is thrown
*/
@Override
public DataCounts write(InputStream inputStream, XContentType xContentType) throws IOException {
public void write(InputStream inputStream, XContentType xContentType, BiConsumer<DataCounts, Exception> handler)
throws IOException {
dataCountsReporter.startNewIncrementalCount();
try (XContentParser parser = XContentFactory.xContent(xContentType)
@ -60,10 +63,12 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
writeJson(parser);
// this line can throw and will be propagated
dataCountsReporter.finishReporting();
dataCountsReporter.finishReporting(
ActionListener.wrap(
response -> handler.accept(dataCountsReporter.incrementalStats(), null),
e -> handler.accept(null, e)
));
}
return dataCountsReporter.incrementalStats();
}
private void writeJson(XContentParser parser) throws IOException {

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.process;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
@ -132,7 +133,7 @@ public class DataCountsReporterTests extends ESTestCase {
assertEquals(302000, dataCountsReporter.runningTotalStats().getLatestRecordTimeStamp().getTime());
// send 'flush' signal
dataCountsReporter.finishReporting();
dataCountsReporter.finishReporting(ActionListener.wrap(r -> {}, e -> {}));
assertEquals(2, dataCountsReporter.runningTotalStats().getBucketCount());
assertEquals(0, dataCountsReporter.runningTotalStats().getEmptyBucketCount());
assertEquals(0, dataCountsReporter.runningTotalStats().getSparseBucketCount());
@ -268,7 +269,7 @@ public class DataCountsReporterTests extends ESTestCase {
dataCountsReporter.reportRecordWritten(5, 2000);
dataCountsReporter.reportRecordWritten(5, 3000);
dataCountsReporter.reportMissingField();
dataCountsReporter.finishReporting();
dataCountsReporter.finishReporting(ActionListener.wrap(r -> {}, e -> {}));
long lastReportedTimeMs = dataCountsReporter.incrementalStats().getLastDataTimeStamp().getTime();
// check last data time is equal to now give or take a second

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
@ -133,6 +134,10 @@ public class AutodetectCommunicatorTests extends ESTestCase {
return null;
}).when(executorService).execute(any(Runnable.class));
DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class);
doAnswer(invocation -> {
((ActionListener<Boolean>) invocation.getArguments()[0]).onResponse(true);
return null;
}).when(dataCountsReporter).finishReporting(any());
return new AutodetectCommunicator(createJobDetails(), autodetectProcess,
dataCountsReporter, autoDetectResultProcessor, e -> {
}, new NamedXContentRegistry(Collections.emptyList()), executorService);

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect.writer;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
@ -79,7 +80,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, null);
writer.write(inputStream, null, (r, e) -> {});
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -89,7 +90,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
expectedRecords.add(new String[] { "2", "2.0", "" });
assertWrittenRecordsEqualTo(expectedRecords);
verify(dataCountsReporter).finishReporting();
verify(dataCountsReporter).finishReporting(any());
}
public void testWrite_GivenTimeFormatIsEpochAndTimestampsAreOutOfOrder() throws IOException {
@ -101,7 +102,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, null);
writer.write(inputStream, null, (r, e) -> {});
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -112,7 +113,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
verify(dataCountsReporter, times(2)).reportOutOfOrderRecord(2);
verify(dataCountsReporter, never()).reportLatestTimeIncrementalStats(anyLong());
verify(dataCountsReporter).finishReporting();
verify(dataCountsReporter).finishReporting(any());
}
public void testWrite_GivenTimeFormatIsEpochAndAllRecordsAreOutOfOrder() throws IOException {
@ -125,7 +126,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
when(dataCountsReporter.getLatestRecordTime()).thenReturn(new Date(5000L));
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, null);
writer.write(inputStream, null, (r, e) -> {});
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -136,7 +137,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
verify(dataCountsReporter, times(2)).reportOutOfOrderRecord(2);
verify(dataCountsReporter, times(2)).reportLatestTimeIncrementalStats(anyLong());
verify(dataCountsReporter, never()).reportRecordWritten(anyLong(), anyLong());
verify(dataCountsReporter).finishReporting();
verify(dataCountsReporter).finishReporting(any());
}
public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOutOfOrder() throws IOException {
@ -155,7 +156,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, null);
writer.write(inputStream, null, (r, e) -> {});
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -169,7 +170,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
verify(dataCountsReporter, times(1)).reportOutOfOrderRecord(2);
verify(dataCountsReporter, never()).reportLatestTimeIncrementalStats(anyLong());
verify(dataCountsReporter).finishReporting();
verify(dataCountsReporter).finishReporting(any());
}
public void testWrite_NullByte() throws IOException {
@ -188,7 +189,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, null);
writer.write(inputStream, null, (r, e) -> {});
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -206,7 +207,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
verify(dataCountsReporter, times(1)).reportRecordWritten(2, 3000);
verify(dataCountsReporter, times(1)).reportRecordWritten(2, 4000);
verify(dataCountsReporter, times(1)).reportDateParseError(2);
verify(dataCountsReporter).finishReporting();
verify(dataCountsReporter).finishReporting(any());
}
public void testWrite_EmptyInput() throws IOException {
@ -216,13 +217,24 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
when(dataCountsReporter.incrementalStats()).thenReturn(new DataCounts("foo"));
doAnswer(invocation -> {
ActionListener<Boolean> listener = (ActionListener<Boolean>) invocation.getArguments()[0];
listener.onResponse(true);
return null;
}).when(dataCountsReporter).finishReporting(any());
InputStream inputStream = createInputStream("");
CsvDataToProcessWriter writer = createWriter();
writer.writeHeader();
DataCounts counts = writer.write(inputStream, null);
assertEquals(0L, counts.getInputBytes());
assertEquals(0L, counts.getInputRecordCount());
writer.write(inputStream, null, (counts, e) -> {
if (e != null) {
fail(e.getMessage());
} else {
assertEquals(0L, counts.getInputBytes());
assertEquals(0L, counts.getInputRecordCount());
}
});
}
public void testWrite_GivenMisplacedQuoteMakesRecordExtendOverTooManyLines() throws IOException {
@ -239,7 +251,7 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
writer.writeHeader();
SuperCsvException e = ESTestCase.expectThrows(SuperCsvException.class,
() -> writer.write(inputStream, null));
() -> writer.write(inputStream, null, (response, error) -> {}));
// Expected line numbers are 2 and 10001, but SuperCSV may print the
// numbers using a different locale's digit characters
assertTrue(e.getMessage(), e.getMessage().matches(

View File

@ -79,7 +79,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, XContentType.JSON);
writer.write(inputStream, XContentType.JSON, (r, e) -> {});
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -89,7 +89,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
expectedRecords.add(new String[]{"2", "2.0", ""});
assertWrittenRecordsEqualTo(expectedRecords);
verify(dataCountsReporter).finishReporting();
verify(dataCountsReporter).finishReporting(any());
}
public void testWrite_GivenTimeFormatIsEpochAndTimestampsAreOutOfOrder()
@ -101,7 +101,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, XContentType.JSON);
writer.write(inputStream, XContentType.JSON, (r, e) -> {});
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -112,7 +112,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
verify(dataCountsReporter, times(2)).reportOutOfOrderRecord(2);
verify(dataCountsReporter, never()).reportLatestTimeIncrementalStats(anyLong());
verify(dataCountsReporter).finishReporting();
verify(dataCountsReporter).finishReporting(any());
}
public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOutOfOrder()
@ -130,7 +130,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, XContentType.JSON);
writer.write(inputStream, XContentType.JSON, (r, e) -> {});
List<String[]> expectedRecords = new ArrayList<>();
// The final field is the control field
@ -143,7 +143,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
verify(dataCountsReporter, times(1)).reportOutOfOrderRecord(2);
verify(dataCountsReporter, never()).reportLatestTimeIncrementalStats(anyLong());
verify(dataCountsReporter).finishReporting();
verify(dataCountsReporter).finishReporting(any());
}
public void testWrite_GivenMalformedJsonWithoutNestedLevels()
@ -159,7 +159,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, XContentType.JSON);
writer.write(inputStream, XContentType.JSON, (r, e) -> {});
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -171,7 +171,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
assertWrittenRecordsEqualTo(expectedRecords);
verify(dataCountsReporter).reportMissingFields(1);
verify(dataCountsReporter).finishReporting();
verify(dataCountsReporter).finishReporting(any());
}
public void testWrite_GivenMalformedJsonWithNestedLevels()
@ -188,7 +188,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, XContentType.JSON);
writer.write(inputStream, XContentType.JSON, (r, e) -> {});
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -199,7 +199,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
expectedRecords.add(new String[]{"3", "3.0", ""});
assertWrittenRecordsEqualTo(expectedRecords);
verify(dataCountsReporter).finishReporting();
verify(dataCountsReporter).finishReporting(any());
}
public void testWrite_GivenMalformedJsonThatNeverRecovers()
@ -216,7 +216,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
writer.writeHeader();
ESTestCase.expectThrows(ElasticsearchParseException.class,
() -> writer.write(inputStream, XContentType.JSON));
() -> writer.write(inputStream, XContentType.JSON, (r, e) -> {}));
}
public void testWrite_GivenJsonWithArrayField()
@ -231,7 +231,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, XContentType.JSON);
writer.write(inputStream, XContentType.JSON, (r, e) -> {});
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -241,7 +241,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
expectedRecords.add(new String[]{"2", "2.0", ""});
assertWrittenRecordsEqualTo(expectedRecords);
verify(dataCountsReporter).finishReporting();
verify(dataCountsReporter).finishReporting(any());
}
public void testWrite_GivenJsonWithMissingFields()
@ -260,7 +260,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
InputStream inputStream = createInputStream(input.toString());
JsonDataToProcessWriter writer = createWriter();
writer.writeHeader();
writer.write(inputStream, XContentType.JSON);
writer.write(inputStream, XContentType.JSON, (r, e) -> {});
verify(dataCountsReporter, times(1)).startNewIncrementalCount();
List<String[]> expectedRecords = new ArrayList<>();
@ -278,7 +278,7 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
verify(dataCountsReporter, times(1)).reportRecordWritten(1, 3000);
verify(dataCountsReporter, times(1)).reportRecordWritten(1, 4000);
verify(dataCountsReporter, times(1)).reportDateParseError(0);
verify(dataCountsReporter).finishReporting();
verify(dataCountsReporter).finishReporting(any());
}
private static InputStream createInputStream(String input) {