[ML] Enforce the stop datafeed aoi to wait until any operation that the datafeed is doing has completed.

Original commit: elastic/x-pack-elasticsearch@e8974191a2
This commit is contained in:
Martijn van Groningen 2017-03-01 15:14:32 +01:00
parent e492b17c10
commit 61ca6d435f
3 changed files with 54 additions and 11 deletions

View File

@ -49,11 +49,11 @@ import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.NodePersistentTask;
import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.NodePersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
@ -265,12 +265,15 @@ public class StartDatafeedAction
stop();
}
/* public for testing */
public void stop() {
stop(TimeValue.timeValueSeconds(20));
}
public void stop(TimeValue timeout) {
if (holder == null) {
throw new IllegalStateException("task cancel ran before datafeed runner assigned the holder");
}
holder.stop("cancel", null);
holder.stop("cancel", timeout, null);
}
}

View File

@ -99,6 +99,7 @@ public class StopDatafeedAction
public Request(String jobId) {
this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName());
setActions(StartDatafeedAction.NAME);
setTimeout(TimeValue.timeValueSeconds(20));
}
Request() {
@ -251,7 +252,7 @@ public class StopDatafeedAction
@Override
protected void taskOperation(Request request, StartDatafeedAction.DatafeedTask task, ActionListener<Response> listener) {
task.stop();
task.stop(request.getTimeout());
listener.onResponse(new Response(true));
}

View File

@ -41,6 +41,8 @@ import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -113,7 +115,7 @@ public class DatafeedJobRunner extends AbstractComponent {
holder.future = threadPool.executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME).submit(() -> {
Long next = null;
try {
next = holder.datafeedJob.runLookBack(startTime, endTime);
next = holder.executeLoopBack(startTime, endTime);
} catch (DatafeedJob.ExtractionProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
@ -136,13 +138,13 @@ public class DatafeedJobRunner extends AbstractComponent {
}
} catch (Exception e) {
logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", e);
holder.stop("general_lookback_failure", e);
holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e);
return;
}
if (next != null) {
doDatafeedRealtime(next, holder.datafeed.getJobId(), holder);
} else {
holder.stop("no_realtime", null);
holder.stop("no_realtime", TimeValue.timeValueSeconds(20), null);
holder.problemTracker.finishReport();
}
});
@ -155,7 +157,7 @@ public class DatafeedJobRunner extends AbstractComponent {
holder.future = threadPool.schedule(delay, MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME, () -> {
long nextDelayInMsSinceEpoch;
try {
nextDelayInMsSinceEpoch = holder.datafeedJob.runRealtime();
nextDelayInMsSinceEpoch = holder.executeRealTime();
holder.problemTracker.reportNoneEmptyCount();
} catch (DatafeedJob.ExtractionProblemException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
@ -168,11 +170,13 @@ public class DatafeedJobRunner extends AbstractComponent {
holder.problemTracker.reportEmptyDataCount();
} catch (Exception e) {
logger.error("Unexpected datafeed failure for job [" + jobId + "] stopping...", e);
holder.stop("general_realtime_error", e);
holder.stop("general_realtime_error", TimeValue.timeValueSeconds(20), e);
return;
}
holder.problemTracker.finishReport();
if (nextDelayInMsSinceEpoch >= 0) {
doDatafeedRealtime(nextDelayInMsSinceEpoch, jobId, holder);
}
});
}
}
@ -241,6 +245,8 @@ public class DatafeedJobRunner extends AbstractComponent {
public class Holder {
private final DatafeedConfig datafeed;
// To ensure that we wait until loopback / realtime search has completed before we stop the datafeed
private final ReentrantLock datafeedJobLock = new ReentrantLock(true);
private final DatafeedJob datafeedJob;
private final boolean autoCloseJob;
private final ProblemTracker problemTracker;
@ -260,10 +266,12 @@ public class DatafeedJobRunner extends AbstractComponent {
return datafeedJob.isRunning();
}
public void stop(String source, Exception e) {
public void stop(String source, TimeValue timeout, Exception e) {
logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", source, datafeed.getId(), datafeed.getJobId());
if (datafeedJob.stop()) {
boolean acquired = false;
try {
acquired = datafeedJobLock.tryLock(timeout.millis(), TimeUnit.MILLISECONDS);
logger.info("[{}] stopping datafeed [{}] for job [{}]...", source, datafeed.getId(), datafeed.getJobId());
FutureUtils.cancel(future);
auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
@ -271,14 +279,45 @@ public class DatafeedJobRunner extends AbstractComponent {
if (autoCloseJob) {
closeJob();
}
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
} finally {
handler.accept(e);
if (acquired) {
datafeedJobLock.unlock();
}
}
} else {
logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId());
}
}
private Long executeLoopBack(long startTime, Long endTime) throws Exception {
datafeedJobLock.lock();
try {
if (isRunning()) {
return datafeedJob.runLookBack(startTime, endTime);
} else {
return null;
}
} finally {
datafeedJobLock.unlock();
}
}
private long executeRealTime() throws Exception {
datafeedJobLock.lock();
try {
if (isRunning()) {
return datafeedJob.runRealtime();
} else {
return -1L;
}
} finally {
datafeedJobLock.unlock();
}
}
private void closeJob() {
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId());
client.execute(CloseJobAction.INSTANCE, closeJobRequest, new ActionListener<CloseJobAction.Response>() {