From ee650b318982121763bf9e8b1886a59f2f208dbb Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 25 Apr 2017 17:55:27 +0200 Subject: [PATCH] [ml] Use allocation id as key in `runningDatafeeds` map instead of datafeed id Original commit: elastic/x-pack-elasticsearch@156e3275b19a329b78b2ac1652f2db7987ad8e77 --- .../xpack/ml/action/StartDatafeedAction.java | 2 +- .../xpack/ml/datafeed/DatafeedManager.java | 29 ++++++++++--------- .../ml/datafeed/DatafeedManagerTests.java | 8 ++--- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index cf26bfc73d1..e5a0449de9b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -401,7 +401,7 @@ public class StartDatafeedAction } public void stop(String reason, TimeValue timeout) { - datafeedManager.stopDatafeed(datafeedId, reason, timeout); + datafeedManager.stopDatafeed(this, reason, timeout); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index dec5fc148e3..1464b03bde8 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -37,7 +37,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksService; import java.time.Duration; import java.util.Collections; -import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -61,7 +60,8 @@ public class DatafeedManager extends AbstractComponent { private final ThreadPool threadPool; private final Supplier currentTimeSupplier; private final Auditor auditor; - private final ConcurrentMap runningDatafeeds = new ConcurrentHashMap<>(); + // Use allocationId as key instead of datafeed id + private final ConcurrentMap runningDatafeeds = new ConcurrentHashMap<>(); public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider, Supplier currentTimeSupplier, Auditor auditor, PersistentTasksService persistentTasksService) { @@ -93,7 +93,7 @@ public class DatafeedManager extends AbstractComponent { latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime(); } Holder holder = createJobDatafeed(datafeed, job, latestFinalBucketEndMs, latestRecordTimeMs, handler, task); - runningDatafeeds.put(datafeedId, holder); + runningDatafeeds.put(task.getAllocationId(), holder); task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { @@ -108,8 +108,9 @@ public class DatafeedManager extends AbstractComponent { }, handler); } - public void stopDatafeed(String datafeedId, String reason, TimeValue timeout) { - Holder holder = runningDatafeeds.remove(datafeedId); + public void stopDatafeed(StartDatafeedAction.DatafeedTask task, String reason, TimeValue timeout) { + logger.info("[{}] attempt to stop datafeed [{}] [{}]", reason, task.getDatafeedId(), task.getAllocationId()); + Holder holder = runningDatafeeds.remove(task.getAllocationId()); if (holder != null) { holder.stop(reason, timeout, null); } @@ -121,8 +122,8 @@ public class DatafeedManager extends AbstractComponent { logger.info("Closing [{}] datafeeds, because [{}]", numDatafeeds, reason); } - for (Map.Entry entry : runningDatafeeds.entrySet()) { - entry.getValue().stop(reason, TimeValue.timeValueSeconds(20), null); + for (Holder holder : runningDatafeeds.values()) { + holder.stop(reason, TimeValue.timeValueSeconds(20), null); } } @@ -237,7 +238,7 @@ public class DatafeedManager extends AbstractComponent { DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job); DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(), dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs); - return new Holder(task.getPersistentTaskId(), datafeed, datafeedJob, task.isLookbackOnly(), + return new Holder(task.getPersistentTaskId(), task.getAllocationId(), datafeed, datafeedJob, task.isLookbackOnly(), new ProblemTracker(auditor, job.getId()), handler); } @@ -286,13 +287,14 @@ public class DatafeedManager extends AbstractComponent { /** * Visible for testing */ - boolean isRunning(String datafeedId) { - return runningDatafeeds.containsKey(datafeedId); + boolean isRunning(long allocationId) { + return runningDatafeeds.containsKey(allocationId); } public class Holder { private final String taskId; + private final long allocationId; private final DatafeedConfig datafeed; // To ensure that we wait until loopback / realtime search has completed before we stop the datafeed private final ReentrantLock datafeedJobLock = new ReentrantLock(true); @@ -302,9 +304,10 @@ public class DatafeedManager extends AbstractComponent { private final Consumer handler; volatile Future future; - Holder(String taskId, DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker, - Consumer handler) { + Holder(String taskId, long allocationId, DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, + ProblemTracker problemTracker, Consumer handler) { this.taskId = taskId; + this.allocationId = allocationId; this.datafeed = datafeed; this.datafeedJob = datafeedJob; this.autoCloseJob = autoCloseJob; @@ -329,7 +332,7 @@ public class DatafeedManager extends AbstractComponent { } finally { logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeed.getId(), datafeed.getJobId(), acquired); - runningDatafeeds.remove(datafeed.getId()); + runningDatafeeds.remove(allocationId); FutureUtils.cancel(future); auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); handler.accept(e); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index 8ec5cb83c75..5541c40d991 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -293,7 +293,7 @@ public class DatafeedManagerTests extends ESTestCase { verify(handler).accept(analysisProblemCaptor.capture()); assertThat(analysisProblemCaptor.getValue().getCause(), equalTo(conflictProblem)); verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: conflict"); - assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(false)); + assertThat(datafeedManager.isRunning(task.getAllocationId()), is(false)); } public void testRealTime_GivenPostAnalysisProblemIsNonConflict() throws Exception { @@ -318,7 +318,7 @@ public class DatafeedManagerTests extends ESTestCase { datafeedManager.run(task, handler); verify(auditor).error("job_id", "Datafeed is encountering errors submitting data for analysis: just runtime"); - assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(true)); + assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true)); } public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception { @@ -344,13 +344,13 @@ public class DatafeedManagerTests extends ESTestCase { if (cancelled) { task.stop("test", StopDatafeedAction.DEFAULT_TIMEOUT); verify(handler).accept(null); - assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(false)); + assertThat(datafeedManager.isRunning(task.getAllocationId()), is(false)); } else { verify(client).execute(same(PostDataAction.INSTANCE), eq(createExpectedPostDataRequest("job_id", contentBytes, xContentType))); verify(client).execute(same(FlushJobAction.INSTANCE), any()); verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); - assertThat(datafeedManager.isRunning(task.getDatafeedId()), is(true)); + assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true)); } }