[ML] Renamed DatafeedJobRunner to DatafeedManager

Original commit: elastic/x-pack-elasticsearch@1228488a2e
This commit is contained in:
Martijn van Groningen 2017-04-04 14:55:49 +02:00
parent 4f1115d7f5
commit 8a87a91897
10 changed files with 55 additions and 55 deletions

View File

@ -128,7 +128,7 @@
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]ChunkingConfig.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]ChunkingConfig.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedConfig.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedConfig.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedJob.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedJob.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedJobRunner.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedManager.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedJobValidator.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedJobValidator.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedUpdate.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedUpdate.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]extractor[/\\]DataExtractorFactory.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]extractor[/\\]DataExtractorFactory.java" checks="LineLength" />
@ -878,7 +878,7 @@
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]action[/\\]util[/\\]QueryPageTests.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]action[/\\]util[/\\]QueryPageTests.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]ChunkingConfigTests.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]ChunkingConfigTests.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedConfigTests.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedConfigTests.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedJobRunnerTests.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedManagerTests.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedJobTests.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedJobTests.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedJobValidatorTests.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedJobValidatorTests.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedUpdateTests.java" checks="LineLength" /> <suppress files="plugin[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]ml[/\\]datafeed[/\\]DatafeedUpdateTests.java" checks="LineLength" />

View File

@ -10,22 +10,22 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
public class InvalidLicenseEnforcer extends AbstractComponent { public class InvalidLicenseEnforcer extends AbstractComponent {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final XPackLicenseState licenseState; private final XPackLicenseState licenseState;
private final DatafeedJobRunner datafeedJobRunner; private final DatafeedManager datafeedManager;
private final AutodetectProcessManager autodetectProcessManager; private final AutodetectProcessManager autodetectProcessManager;
InvalidLicenseEnforcer(Settings settings, XPackLicenseState licenseState, ThreadPool threadPool, InvalidLicenseEnforcer(Settings settings, XPackLicenseState licenseState, ThreadPool threadPool,
DatafeedJobRunner datafeedJobRunner, AutodetectProcessManager autodetectProcessManager) { DatafeedManager datafeedManager, AutodetectProcessManager autodetectProcessManager) {
super(settings); super(settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.licenseState = licenseState; this.licenseState = licenseState;
this.datafeedJobRunner = datafeedJobRunner; this.datafeedManager = datafeedManager;
this.autodetectProcessManager = autodetectProcessManager; this.autodetectProcessManager = autodetectProcessManager;
licenseState.addListener(this::closeJobsAndDatafeedsIfLicenseExpired); licenseState.addListener(this::closeJobsAndDatafeedsIfLicenseExpired);
} }
@ -40,7 +40,7 @@ public class InvalidLicenseEnforcer extends AbstractComponent {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
datafeedJobRunner.stopAllDatafeeds("invalid license"); datafeedManager.stopAllDatafeeds("invalid license");
autodetectProcessManager.closeAllJobs("invalid license"); autodetectProcessManager.closeAllJobs("invalid license");
} }
}); });

View File

@ -70,7 +70,7 @@ import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.ml.action.ValidateDetectorAction; import org.elasticsearch.xpack.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction; import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier; import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier;
@ -299,16 +299,16 @@ public class MachineLearning implements ActionPlugin {
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry); normalizerFactory, xContentRegistry);
PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient); PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient);
DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, internalClient, clusterService, jobProvider, DatafeedManager datafeedManager = new DatafeedManager(threadPool, internalClient, clusterService, jobProvider,
System::currentTimeMillis, auditor, persistentTasksService); System::currentTimeMillis, auditor, persistentTasksService);
InvalidLicenseEnforcer invalidLicenseEnforcer = InvalidLicenseEnforcer invalidLicenseEnforcer =
new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedJobRunner, autodetectProcessManager); new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedManager, autodetectProcessManager);
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList( PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList(
new OpenJobAction.OpenJobPersistentTasksExecutor(settings, threadPool, licenseState, persistentTasksService, clusterService, new OpenJobAction.OpenJobPersistentTasksExecutor(settings, threadPool, licenseState, persistentTasksService, clusterService,
autodetectProcessManager, auditor), autodetectProcessManager, auditor),
new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, threadPool, licenseState, persistentTasksService, new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, threadPool, licenseState, persistentTasksService,
datafeedJobRunner, auditor) datafeedManager, auditor)
)); ));
return Arrays.asList( return Arrays.asList(
@ -319,7 +319,7 @@ public class MachineLearning implements ActionPlugin {
new MachineLearningTemplateRegistry(settings, clusterService, internalClient, threadPool), new MachineLearningTemplateRegistry(settings, clusterService, internalClient, threadPool),
new MlInitializationService(settings, threadPool, clusterService, internalClient), new MlInitializationService(settings, threadPool, clusterService, internalClient),
jobDataCountsPersister, jobDataCountsPersister,
datafeedJobRunner, datafeedManager,
persistentTasksService, persistentTasksService,
persistentTasksExecutorRegistry, persistentTasksExecutorRegistry,
new PersistentTasksClusterService(Settings.EMPTY, persistentTasksExecutorRegistry, clusterService), new PersistentTasksClusterService(Settings.EMPTY, persistentTasksExecutorRegistry, clusterService),

View File

@ -44,8 +44,8 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
@ -296,7 +296,7 @@ public class StartDatafeedAction
private final long startTime; private final long startTime;
private final Long endTime; private final Long endTime;
/* only pck protected for testing */ /* only pck protected for testing */
volatile DatafeedJobRunner datafeedJobRunner; volatile DatafeedManager datafeedManager;
DatafeedTask(long id, String type, String action, TaskId parentTaskId, Request request) { DatafeedTask(long id, String type, String action, TaskId parentTaskId, Request request) {
super(id, type, action, "datafeed-" + request.getDatafeedId(), parentTaskId); super(id, type, action, "datafeed-" + request.getDatafeedId(), parentTaskId);
@ -332,7 +332,7 @@ public class StartDatafeedAction
} }
public void stop(String reason, TimeValue timeout) { public void stop(String reason, TimeValue timeout) {
datafeedJobRunner.stopDatafeed(datafeedId, reason, timeout); datafeedManager.stopDatafeed(datafeedId, reason, timeout);
} }
} }
@ -394,17 +394,17 @@ public class StartDatafeedAction
} }
public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor<Request> { public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor<Request> {
private final DatafeedJobRunner datafeedJobRunner; private final DatafeedManager datafeedManager;
private final XPackLicenseState licenseState; private final XPackLicenseState licenseState;
private final Auditor auditor; private final Auditor auditor;
private final ThreadPool threadPool; private final ThreadPool threadPool;
public StartDatafeedPersistentTasksExecutor(Settings settings, ThreadPool threadPool, XPackLicenseState licenseState, public StartDatafeedPersistentTasksExecutor(Settings settings, ThreadPool threadPool, XPackLicenseState licenseState,
PersistentTasksService persistentTasksService, DatafeedJobRunner datafeedJobRunner, PersistentTasksService persistentTasksService, DatafeedManager datafeedManager,
Auditor auditor) { Auditor auditor) {
super(settings, NAME, persistentTasksService, ThreadPool.Names.MANAGEMENT); super(settings, NAME, persistentTasksService, ThreadPool.Names.MANAGEMENT);
this.licenseState = licenseState; this.licenseState = licenseState;
this.datafeedJobRunner = datafeedJobRunner; this.datafeedManager = datafeedManager;
this.auditor = auditor; this.auditor = auditor;
this.threadPool = threadPool; this.threadPool = threadPool;
} }
@ -431,8 +431,8 @@ public class StartDatafeedAction
protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, Request request, protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, Request request,
ActionListener<TransportResponse.Empty> listener) { ActionListener<TransportResponse.Empty> listener) {
DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask; DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask;
datafeedTask.datafeedJobRunner = datafeedJobRunner; datafeedTask.datafeedManager = datafeedManager;
datafeedJobRunner.run(datafeedTask, datafeedManager.run(datafeedTask,
(error) -> { (error) -> {
if (error != null) { if (error != null) {
listener.onFailure(error); listener.onFailure(error);

View File

@ -52,7 +52,7 @@ import java.util.function.Supplier;
import static org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import static org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
public class DatafeedJobRunner extends AbstractComponent { public class DatafeedManager extends AbstractComponent {
private static final String INF_SYMBOL = "\u221E"; private static final String INF_SYMBOL = "\u221E";
@ -65,8 +65,8 @@ public class DatafeedJobRunner extends AbstractComponent {
private final Auditor auditor; private final Auditor auditor;
private final ConcurrentMap<String, Holder> runningDatafeeds = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Holder> runningDatafeeds = new ConcurrentHashMap<>();
public DatafeedJobRunner(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider, public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider,
Supplier<Long> currentTimeSupplier, Auditor auditor, PersistentTasksService persistentTasksService) { Supplier<Long> currentTimeSupplier, Auditor auditor, PersistentTasksService persistentTasksService) {
super(Settings.EMPTY); super(Settings.EMPTY);
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService); this.clusterService = Objects.requireNonNull(clusterService);

View File

@ -34,8 +34,8 @@ import java.util.Collections;
import java.util.Date; import java.util.Date;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT; import static org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;

View File

@ -18,8 +18,8 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests; import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
@ -112,7 +112,7 @@ public class StartDatafeedActionTests extends ESTestCase {
} }
public void testValidate() { public void testValidate() {
Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build(new Date()); Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
MlMetadata mlMetadata1 = new MlMetadata.Builder() MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false) .putJob(job1, false)
.build(); .build();
@ -122,14 +122,14 @@ public class StartDatafeedActionTests extends ESTestCase {
} }
public void testValidate_jobClosed() { public void testValidate_jobClosed() {
Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build(new Date()); Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date());
MlMetadata mlMetadata1 = new MlMetadata.Builder() MlMetadata mlMetadata1 = new MlMetadata.Builder()
.putJob(job1, false) .putJob(job1, false)
.build(); .build();
PersistentTask<OpenJobAction.Request> task = PersistentTask<OpenJobAction.Request> task =
new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), INITIAL_ASSIGNMENT); new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), INITIAL_ASSIGNMENT);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap(0L, task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap(0L, task));
DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
.putDatafeed(datafeedConfig1) .putDatafeed(datafeedConfig1)
.build(); .build();
@ -164,9 +164,9 @@ public class StartDatafeedActionTests extends ESTestCase {
public static StartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action, public static StartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action,
TaskId parentTaskId, TaskId parentTaskId,
StartDatafeedAction.Request request, StartDatafeedAction.Request request,
DatafeedJobRunner datafeedJobRunner) { DatafeedManager datafeedManager) {
StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(id, type, action, parentTaskId, request); StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(id, type, action, parentTaskId, request);
task.datafeedJobRunner = datafeedJobRunner; task.datafeedManager = datafeedManager;
return task; return task;
} }

View File

@ -22,8 +22,8 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTestCase<Request> { public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTestCase<Request> {

View File

@ -76,7 +76,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class DatafeedJobRunnerTests extends ESTestCase { public class DatafeedManagerTests extends ESTestCase {
private Client client; private Client client;
private ActionFuture<PostDataAction.Response> jobDataFuture; private ActionFuture<PostDataAction.Response> jobDataFuture;
@ -84,7 +84,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
private ClusterService clusterService; private ClusterService clusterService;
private ThreadPool threadPool; private ThreadPool threadPool;
private DataExtractorFactory dataExtractorFactory; private DataExtractorFactory dataExtractorFactory;
private DatafeedJobRunner datafeedJobRunner; private DatafeedManager datafeedManager;
private long currentTime = 120000; private long currentTime = 120000;
private Auditor auditor; private Auditor auditor;
@ -144,7 +144,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider, () -> currentTime, auditor, datafeedManager = new DatafeedManager(threadPool, client, clusterService, jobProvider, () -> currentTime, auditor,
persistentTasksService) { persistentTasksService) {
@Override @Override
DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) { DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) {
@ -167,7 +167,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(dataExtractor.next()).thenReturn(Optional.empty()); when(dataExtractor.next()).thenReturn(Optional.empty());
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedJobRunner.run(task, handler); datafeedManager.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any()); verify(threadPool, never()).schedule(any(), any(), any());
@ -189,7 +189,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedJobRunner.run(task, handler); datafeedManager.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any()); verify(threadPool, never()).schedule(any(), any(), any());
@ -219,7 +219,7 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedJobRunner.run(task, handler); datafeedManager.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any()); verify(threadPool, never()).schedule(any(), any(), any());
@ -254,8 +254,8 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(dataExtractor.hasNext()).thenReturn(false); when(dataExtractor.hasNext()).thenReturn(false);
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null);
DatafeedJobRunner.Holder holder = datafeedJobRunner.createJobDatafeed(datafeedConfig, job, 100, 100, handler, task); DatafeedManager.Holder holder = datafeedManager.createJobDatafeed(datafeedConfig, job, 100, 100, handler, task);
datafeedJobRunner.doDatafeedRealtime(10L, "foo", holder); datafeedManager.doDatafeedRealtime(10L, "foo", holder);
verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any());
verify(auditor, times(1)).warning(eq("job_id"), anyString()); verify(auditor, times(1)).warning(eq("job_id"), anyString());
@ -278,9 +278,9 @@ public class DatafeedJobRunnerTests extends ESTestCase {
boolean cancelled = randomBoolean(); boolean cancelled = randomBoolean();
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L); StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L);
DatafeedTask task = StartDatafeedActionTests.createDatafeedTask(1, "type", "action", null, DatafeedTask task = StartDatafeedActionTests.createDatafeedTask(1, "type", "action", null,
startDatafeedRequest, datafeedJobRunner); startDatafeedRequest, datafeedManager);
task = spyDatafeedTask(task); task = spyDatafeedTask(task);
datafeedJobRunner.run(task, handler); datafeedManager.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
if (cancelled) { if (cancelled) {

View File

@ -11,7 +11,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests; import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
@ -36,9 +36,9 @@ public class DataExtractorFactoryTests extends ESTestCase {
public void testCreateDataExtractorFactoryGivenDefaultScroll() { public void testCreateDataExtractorFactoryGivenDefaultScroll() {
DataDescription.Builder dataDescription = new DataDescription.Builder(); DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time"); dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription); jobBuilder.setDataDescription(dataDescription);
DatafeedConfig datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo").build(); DatafeedConfig datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build();
DataExtractorFactory dataExtractorFactory = DataExtractorFactory dataExtractorFactory =
DataExtractorFactory.create(client, datafeedConfig, jobBuilder.build(new Date())); DataExtractorFactory.create(client, datafeedConfig, jobBuilder.build(new Date()));
@ -49,9 +49,9 @@ public class DataExtractorFactoryTests extends ESTestCase {
public void testCreateDataExtractorFactoryGivenScrollWithAutoChunk() { public void testCreateDataExtractorFactoryGivenScrollWithAutoChunk() {
DataDescription.Builder dataDescription = new DataDescription.Builder(); DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time"); dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription); jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
DataExtractorFactory dataExtractorFactory = DataExtractorFactory dataExtractorFactory =
@ -63,9 +63,9 @@ public class DataExtractorFactoryTests extends ESTestCase {
public void testCreateDataExtractorFactoryGivenScrollWithOffChunk() { public void testCreateDataExtractorFactoryGivenScrollWithOffChunk() {
DataDescription.Builder dataDescription = new DataDescription.Builder(); DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time"); dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription); jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff()); datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
DataExtractorFactory dataExtractorFactory = DataExtractorFactory dataExtractorFactory =
@ -77,9 +77,9 @@ public class DataExtractorFactoryTests extends ESTestCase {
public void testCreateDataExtractorFactoryGivenAggregation() { public void testCreateDataExtractorFactoryGivenAggregation() {
DataDescription.Builder dataDescription = new DataDescription.Builder(); DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time"); dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription); jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000))); AggregationBuilders.histogram("time").interval(300000)));
@ -92,9 +92,9 @@ public class DataExtractorFactoryTests extends ESTestCase {
public void testCreateDataExtractorFactoryGivenDefaultAggregationWithAutoChunk() { public void testCreateDataExtractorFactoryGivenDefaultAggregationWithAutoChunk() {
DataDescription.Builder dataDescription = new DataDescription.Builder(); DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time"); dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription); jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000))); AggregationBuilders.histogram("time").interval(300000)));
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());