Scheduler robustness improvements (elastic/elasticsearch#687)

* Extract method ScheduledJob#postData

* Remove unreachable else statement

* Restrain usage of DataExtractor in a single thread

Original commit: elastic/x-pack-elasticsearch@5b9b310d9d
This commit is contained in:
Dimitris Athanasiou 2017-01-10 17:09:01 +00:00 committed by GitHub
parent 51c50c5840
commit 3657d8a137
5 changed files with 37 additions and 19 deletions

View File

@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier; import java.util.function.Supplier;
class ScheduledJob { class ScheduledJob {
@ -38,7 +39,6 @@ class ScheduledJob {
private final DataExtractorFactory dataExtractorFactory; private final DataExtractorFactory dataExtractorFactory;
private final Supplier<Long> currentTimeSupplier; private final Supplier<Long> currentTimeSupplier;
private volatile DataExtractor dataExtractor;
private volatile long lookbackStartTimeMs; private volatile long lookbackStartTimeMs;
private volatile Long lastEndTimeMs; private volatile Long lastEndTimeMs;
private volatile boolean running = true; private volatile boolean running = true;
@ -104,9 +104,6 @@ class ScheduledJob {
} }
public void stop() { public void stop() {
if (dataExtractor != null) {
dataExtractor.cancel();
}
running = false; running = false;
auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_STOPPED)); auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_STOPPED));
} }
@ -124,8 +121,12 @@ class ScheduledJob {
RuntimeException error = null; RuntimeException error = null;
long recordCount = 0; long recordCount = 0;
dataExtractor = dataExtractorFactory.newExtractor(start, end); DataExtractor dataExtractor = dataExtractorFactory.newExtractor(start, end);
while (dataExtractor.hasNext()) { while (dataExtractor.hasNext()) {
if (!isRunning() && !dataExtractor.isCancelled()) {
dataExtractor.cancel();
}
Optional<InputStream> extractedData; Optional<InputStream> extractedData;
try { try {
extractedData = dataExtractor.next(); extractedData = dataExtractor.next();
@ -136,12 +137,7 @@ class ScheduledJob {
if (extractedData.isPresent()) { if (extractedData.isPresent()) {
DataCounts counts; DataCounts counts;
try (InputStream in = extractedData.get()) { try (InputStream in = extractedData.get()) {
PostDataAction.Request request = new PostDataAction.Request(jobId); counts = postData(in);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Streams.copy(in, outputStream);
request.setContent(new BytesArray(outputStream.toByteArray()));
PostDataAction.Response response = client.execute(PostDataAction.INSTANCE, request).get();
counts = response.getDataCounts();
} catch (Exception e) { } catch (Exception e) {
if (e instanceof InterruptedException) { if (e instanceof InterruptedException) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -155,7 +151,6 @@ class ScheduledJob {
} }
} }
} }
dataExtractor = null;
lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1); lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1);
@ -179,6 +174,15 @@ class ScheduledJob {
} }
} }
private DataCounts postData(InputStream inputStream) throws IOException, ExecutionException, InterruptedException {
PostDataAction.Request request = new PostDataAction.Request(jobId);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Streams.copy(inputStream, outputStream);
request.setContent(new BytesArray(outputStream.toByteArray()));
PostDataAction.Response response = client.execute(PostDataAction.INSTANCE, request).get();
return response.getDataCounts();
}
private long nextRealtimeTimestamp() { private long nextRealtimeTimestamp() {
long epochMs = currentTimeSupplier.get() + frequencyMs; long epochMs = currentTimeSupplier.get() + frequencyMs;
return toIntervalStartEpochMs(epochMs) + NEXT_TASK_DELAY_MS; return toIntervalStartEpochMs(epochMs) + NEXT_TASK_DELAY_MS;
@ -195,7 +199,6 @@ class ScheduledJob {
AnalysisProblemException(Throwable cause) { AnalysisProblemException(Throwable cause) {
super(cause); super(cause);
} }
} }
class ExtractionProblemException extends RuntimeException { class ExtractionProblemException extends RuntimeException {

View File

@ -155,8 +155,6 @@ public class ScheduledJobRunner extends AbstractComponent {
holder.problemTracker.finishReport(); holder.problemTracker.finishReport();
doScheduleRealtime(nextDelayInMsSinceEpoch, jobId, holder); doScheduleRealtime(nextDelayInMsSinceEpoch, jobId, holder);
}); });
} else {
holder.stop(null);
} }
} }

View File

@ -24,6 +24,11 @@ public interface DataExtractor {
*/ */
Optional<InputStream> next() throws IOException; Optional<InputStream> next() throws IOException;
/**
* @return {@code true} if the extractor has been cancelled, or {@code false} otherwise
*/
boolean isCancelled();
/** /**
* Cancel the current search. * Cancel the current search.
*/ */

View File

@ -37,6 +37,12 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/**
* An implementation that extracts data from elasticsearch using search and scroll on a client.
* It supports safe and responsive cancellation by continuing the scroll until a new timestamp
* is seen.
* Note that this class is NOT thread-safe.
*/
class ScrollDataExtractor implements DataExtractor { class ScrollDataExtractor implements DataExtractor {
private static final Logger LOGGER = Loggers.getLogger(ScrollDataExtractor.class); private static final Logger LOGGER = Loggers.getLogger(ScrollDataExtractor.class);
@ -44,10 +50,10 @@ class ScrollDataExtractor implements DataExtractor {
private final Client client; private final Client client;
private final ScrollDataExtractorContext context; private final ScrollDataExtractorContext context;
private volatile String scrollId; private String scrollId;
private volatile boolean isCancelled; private boolean isCancelled;
private volatile boolean hasNext; private boolean hasNext;
private volatile Long timestampOnCancel; private Long timestampOnCancel;
public ScrollDataExtractor(Client client, ScrollDataExtractorContext dataExtractorContext) { public ScrollDataExtractor(Client client, ScrollDataExtractorContext dataExtractorContext) {
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
@ -60,6 +66,11 @@ class ScrollDataExtractor implements DataExtractor {
return hasNext; return hasNext;
} }
@Override
public boolean isCancelled() {
return isCancelled;
}
@Override @Override
public void cancel() { public void cancel() {
LOGGER.trace("[{}] Data extractor received cancel request", context.jobId); LOGGER.trace("[{}] Data extractor received cancel request", context.jobId);

View File

@ -213,6 +213,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
); );
extractor.setNextResponse(response2); extractor.setNextResponse(response2);
assertThat(extractor.isCancelled(), is(true));
assertThat(extractor.hasNext(), is(true)); assertThat(extractor.hasNext(), is(true));
stream = extractor.next(); stream = extractor.next();
assertThat(stream.isPresent(), is(true)); assertThat(stream.isPresent(), is(true));