[ml] Use allocation id as key in `runningDatafeeds` map instead of datafeed id

Original commit: elastic/x-pack-elasticsearch@156e3275b1
This commit is contained in:
Martijn van Groningen 2017-04-25 17:55:27 +02:00
parent 32128894a5
commit ee650b3189
3 changed files with 21 additions and 18 deletions

View File

@ -401,7 +401,7 @@ public class StartDatafeedAction
}
public void stop(String reason, TimeValue timeout) {
datafeedManager.stopDatafeed(datafeedId, reason, timeout);
datafeedManager.stopDatafeed(this, reason, timeout);
}
}

View File

@ -37,7 +37,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksService;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -61,7 +60,8 @@ public class DatafeedManager extends AbstractComponent {
private final ThreadPool threadPool;
private final Supplier<Long> currentTimeSupplier;
private final Auditor auditor;
private final ConcurrentMap<String, Holder> runningDatafeeds = new ConcurrentHashMap<>();
// Use allocationId as key instead of datafeed id
private final ConcurrentMap<Long, Holder> runningDatafeeds = new ConcurrentHashMap<>();
public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider,
Supplier<Long> currentTimeSupplier, Auditor auditor, PersistentTasksService persistentTasksService) {
@ -93,7 +93,7 @@ public class DatafeedManager extends AbstractComponent {
latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime();
}
Holder holder = createJobDatafeed(datafeed, job, latestFinalBucketEndMs, latestRecordTimeMs, handler, task);
runningDatafeeds.put(datafeedId, holder);
runningDatafeeds.put(task.getAllocationId(), holder);
task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
@ -108,8 +108,9 @@ public class DatafeedManager extends AbstractComponent {
}, handler);
}
public void stopDatafeed(String datafeedId, String reason, TimeValue timeout) {
Holder holder = runningDatafeeds.remove(datafeedId);
public void stopDatafeed(StartDatafeedAction.DatafeedTask task, String reason, TimeValue timeout) {
logger.info("[{}] attempt to stop datafeed [{}] [{}]", reason, task.getDatafeedId(), task.getAllocationId());
Holder holder = runningDatafeeds.remove(task.getAllocationId());
if (holder != null) {
holder.stop(reason, timeout, null);
}
@ -121,8 +122,8 @@ public class DatafeedManager extends AbstractComponent {
logger.info("Closing [{}] datafeeds, because [{}]", numDatafeeds, reason);
}
for (Map.Entry<String, Holder> entry : runningDatafeeds.entrySet()) {
entry.getValue().stop(reason, TimeValue.timeValueSeconds(20), null);
for (Holder holder : runningDatafeeds.values()) {
holder.stop(reason, TimeValue.timeValueSeconds(20), null);
}
}
@ -237,7 +238,7 @@ public class DatafeedManager extends AbstractComponent {
DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job);
DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(),
dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs);
return new Holder(task.getPersistentTaskId(), datafeed, datafeedJob, task.isLookbackOnly(),
return new Holder(task.getPersistentTaskId(), task.getAllocationId(), datafeed, datafeedJob, task.isLookbackOnly(),
new ProblemTracker(auditor, job.getId()), handler);
}
@ -286,13 +287,14 @@ public class DatafeedManager extends AbstractComponent {
/**
* Visible for testing
*/
boolean isRunning(String datafeedId) {
return runningDatafeeds.containsKey(datafeedId);
boolean isRunning(long allocationId) {
return runningDatafeeds.containsKey(allocationId);
}
public class Holder {
private final String taskId;
private final long allocationId;
private final DatafeedConfig datafeed;
// To ensure that we wait until loopback / realtime search has completed before we stop the datafeed
private final ReentrantLock datafeedJobLock = new ReentrantLock(true);
@ -302,9 +304,10 @@ public class DatafeedManager extends AbstractComponent {
private final Consumer<Exception> handler;
volatile Future<?> future;
Holder(String taskId, DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker,
Consumer<Exception> handler) {
Holder(String taskId, long allocationId, DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob,
ProblemTracker problemTracker, Consumer<Exception> handler) {
this.taskId = taskId;
this.allocationId = allocationId;
this.datafeed = datafeed;
this.datafeedJob = datafeedJob;
this.autoCloseJob = autoCloseJob;
@ -329,7 +332,7 @@ public class DatafeedManager extends AbstractComponent {
} finally {
logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeed.getId(),
datafeed.getJobId(), acquired);
runningDatafeeds.remove(datafeed.getId());
runningDatafeeds.remove(allocationId);
FutureUtils.cancel(future);
auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
handler.accept(e);

View File

@ -293,7 +293,7 @@ public class DatafeedManagerTests extends ESTestCase {
verify(handler).accept(analysisProblemCaptor.capture());
assertThat(analysisProblemCaptor.getValue().getCause(), equalTo(conflictProblem));
verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: conflict");
assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(false));
assertThat(datafeedManager.isRunning(task.getAllocationId()), is(false));
}
public void testRealTime_GivenPostAnalysisProblemIsNonConflict() throws Exception {
@ -318,7 +318,7 @@ public class DatafeedManagerTests extends ESTestCase {
datafeedManager.run(task, handler);
verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: just runtime");
assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(true));
assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true));
}
public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception {
@ -344,13 +344,13 @@ public class DatafeedManagerTests extends ESTestCase {
if (cancelled) {
task.stop("test", StopDatafeedAction.DEFAULT_TIMEOUT);
verify(handler).accept(null);
assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(false));
assertThat(datafeedManager.isRunning(task.getAllocationId()), is(false));
} else {
verify(client).execute(same(PostDataAction.INSTANCE),
eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType)));
verify(client).execute(same(FlushJobAction.INSTANCE), any());
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any());
assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(true));
assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true));
}
}