diff --git a/dev-tools/checkstyle_suppressions.xml b/dev-tools/checkstyle_suppressions.xml index 0d1f63c3256..f087898e66c 100644 --- a/dev-tools/checkstyle_suppressions.xml +++ b/dev-tools/checkstyle_suppressions.xml @@ -128,7 +128,7 @@ - + @@ -878,7 +878,7 @@ - + diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java index f1e715bef1a..74b8226a6be 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java @@ -10,22 +10,22 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner; +import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; public class InvalidLicenseEnforcer extends AbstractComponent { private final ThreadPool threadPool; private final XPackLicenseState licenseState; - private final DatafeedJobRunner datafeedJobRunner; + private final DatafeedManager datafeedManager; private final AutodetectProcessManager autodetectProcessManager; InvalidLicenseEnforcer(Settings settings, XPackLicenseState licenseState, ThreadPool threadPool, - DatafeedJobRunner datafeedJobRunner, AutodetectProcessManager autodetectProcessManager) { + DatafeedManager datafeedManager, AutodetectProcessManager autodetectProcessManager) { super(settings); this.threadPool = threadPool; this.licenseState = licenseState; - this.datafeedJobRunner = datafeedJobRunner; + this.datafeedManager = datafeedManager; this.autodetectProcessManager = autodetectProcessManager; licenseState.addListener(this::closeJobsAndDatafeedsIfLicenseExpired); } @@ -40,7 +40,7 @@ public class InvalidLicenseEnforcer extends AbstractComponent { @Override protected void doRun() throws Exception { - datafeedJobRunner.stopAllDatafeeds("invalid license"); + datafeedManager.stopAllDatafeeds("invalid license"); autodetectProcessManager.closeAllJobs("invalid license"); } }); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 81041292190..b1c87951c5d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -70,7 +70,7 @@ import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction; import org.elasticsearch.xpack.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.ml.action.ValidateDetectorAction; import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction; -import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner; +import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier; @@ -299,16 +299,16 @@ public class MachineLearning implements ActionPlugin { jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, xContentRegistry); PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient); - DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, internalClient, clusterService, jobProvider, + DatafeedManager datafeedManager = new DatafeedManager(threadPool, internalClient, clusterService, jobProvider, System::currentTimeMillis, auditor, persistentTasksService); InvalidLicenseEnforcer invalidLicenseEnforcer = - new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedJobRunner, autodetectProcessManager); + new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedManager, autodetectProcessManager); PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList( new OpenJobAction.OpenJobPersistentTasksExecutor(settings, threadPool, licenseState, persistentTasksService, clusterService, autodetectProcessManager, auditor), new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, threadPool, licenseState, persistentTasksService, - datafeedJobRunner, auditor) + datafeedManager, auditor) )); return Arrays.asList( @@ -319,7 +319,7 @@ public class MachineLearning implements ActionPlugin { new MachineLearningTemplateRegistry(settings, clusterService, internalClient, threadPool), new MlInitializationService(settings, threadPool, clusterService, internalClient), jobDataCountsPersister, - datafeedJobRunner, + datafeedManager, persistentTasksService, persistentTasksExecutorRegistry, new PersistentTasksClusterService(Settings.EMPTY, persistentTasksExecutorRegistry, clusterService), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index a8f290e1774..2d755d9fec1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -44,8 +44,8 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator; +import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; @@ -296,7 +296,7 @@ public class StartDatafeedAction private final long startTime; private final Long endTime; /* only pck protected for testing */ - volatile DatafeedJobRunner datafeedJobRunner; + volatile DatafeedManager datafeedManager; DatafeedTask(long id, String type, String action, TaskId parentTaskId, Request request) { super(id, type, action, "datafeed-" + request.getDatafeedId(), parentTaskId); @@ -332,7 +332,7 @@ public class StartDatafeedAction } public void stop(String reason, TimeValue timeout) { - datafeedJobRunner.stopDatafeed(datafeedId, reason, timeout); + datafeedManager.stopDatafeed(datafeedId, reason, timeout); } } @@ -394,17 +394,17 @@ public class StartDatafeedAction } public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor { - private final DatafeedJobRunner datafeedJobRunner; + private final DatafeedManager datafeedManager; private final XPackLicenseState licenseState; private final Auditor auditor; private final ThreadPool threadPool; public StartDatafeedPersistentTasksExecutor(Settings settings, ThreadPool threadPool, XPackLicenseState licenseState, - PersistentTasksService persistentTasksService, DatafeedJobRunner datafeedJobRunner, + PersistentTasksService persistentTasksService, DatafeedManager datafeedManager, Auditor auditor) { super(settings, NAME, persistentTasksService, ThreadPool.Names.MANAGEMENT); this.licenseState = licenseState; - this.datafeedJobRunner = datafeedJobRunner; + this.datafeedManager = datafeedManager; this.auditor = auditor; this.threadPool = threadPool; } @@ -431,8 +431,8 @@ public class StartDatafeedAction protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, Request request, ActionListener listener) { DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask; - datafeedTask.datafeedJobRunner = datafeedJobRunner; - datafeedJobRunner.run(datafeedTask, + datafeedTask.datafeedManager = datafeedManager; + datafeedManager.run(datafeedTask, (error) -> { if (error != null) { listener.onFailure(error); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java similarity index 98% rename from plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java rename to plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 68bd7e713df..6c07277b0f9 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -52,7 +52,7 @@ import java.util.function.Supplier; import static org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; -public class DatafeedJobRunner extends AbstractComponent { +public class DatafeedManager extends AbstractComponent { private static final String INF_SYMBOL = "\u221E"; @@ -65,8 +65,8 @@ public class DatafeedJobRunner extends AbstractComponent { private final Auditor auditor; private final ConcurrentMap runningDatafeeds = new ConcurrentHashMap<>(); - public DatafeedJobRunner(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider, - Supplier currentTimeSupplier, Auditor auditor, PersistentTasksService persistentTasksService) { + public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider, + Supplier currentTimeSupplier, Auditor auditor, PersistentTasksService persistentTasksService) { super(Settings.EMPTY); this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 9b1ac976a1f..bc909ce0615 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -34,8 +34,8 @@ import java.util.Collections; import java.util.Date; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; -import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig; -import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob; +import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig; +import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; import static org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT; import static org.hamcrest.Matchers.equalTo; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index cee80ca2724..7ba9c05896a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -18,8 +18,8 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner; -import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests; +import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; +import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; @@ -112,7 +112,7 @@ public class StartDatafeedActionTests extends ESTestCase { } public void testValidate() { - Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build(new Date()); + Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date()); MlMetadata mlMetadata1 = new MlMetadata.Builder() .putJob(job1, false) .build(); @@ -122,14 +122,14 @@ public class StartDatafeedActionTests extends ESTestCase { } public void testValidate_jobClosed() { - Job job1 = DatafeedJobRunnerTests.createDatafeedJob().build(new Date()); + Job job1 = DatafeedManagerTests.createDatafeedJob().build(new Date()); MlMetadata mlMetadata1 = new MlMetadata.Builder() .putJob(job1, false) .build(); PersistentTask task = new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), INITIAL_ASSIGNMENT); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap(0L, task)); - DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); + DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) .putDatafeed(datafeedConfig1) .build(); @@ -164,9 +164,9 @@ public class StartDatafeedActionTests extends ESTestCase { public static StartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action, TaskId parentTaskId, StartDatafeedAction.Request request, - DatafeedJobRunner datafeedJobRunner) { + DatafeedManager datafeedManager) { StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(id, type, action, parentTaskId, request); - task.datafeedJobRunner = datafeedJobRunner; + task.datafeedManager = datafeedManager; return task; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index 42193efdaff..fb60675acf5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -22,8 +22,8 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste import java.util.Collections; import java.util.Date; -import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig; -import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob; +import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig; +import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; import static org.hamcrest.Matchers.equalTo; public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTestCase { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java similarity index 96% rename from plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java rename to plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index 97cc0f086a9..fca20330051 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -76,7 +76,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class DatafeedJobRunnerTests extends ESTestCase { +public class DatafeedManagerTests extends ESTestCase { private Client client; private ActionFuture jobDataFuture; @@ -84,7 +84,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { private ClusterService clusterService; private ThreadPool threadPool; private DataExtractorFactory dataExtractorFactory; - private DatafeedJobRunner datafeedJobRunner; + private DatafeedManager datafeedManager; private long currentTime = 120000; private Auditor auditor; @@ -144,7 +144,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); - datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider, () -> currentTime, auditor, + datafeedManager = new DatafeedManager(threadPool, client, clusterService, jobProvider, () -> currentTime, auditor, persistentTasksService) { @Override DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) { @@ -167,7 +167,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(dataExtractor.next()).thenReturn(Optional.empty()); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedJobRunner.run(task, handler); + datafeedManager.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); @@ -189,7 +189,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedJobRunner.run(task, handler); + datafeedManager.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); @@ -219,7 +219,7 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(jobDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); - datafeedJobRunner.run(task, handler); + datafeedManager.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); @@ -254,8 +254,8 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(dataExtractor.hasNext()).thenReturn(false); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null); - DatafeedJobRunner.Holder holder = datafeedJobRunner.createJobDatafeed(datafeedConfig, job, 100, 100, handler, task); - datafeedJobRunner.doDatafeedRealtime(10L, "foo", holder); + DatafeedManager.Holder holder = datafeedManager.createJobDatafeed(datafeedConfig, job, 100, 100, handler, task); + datafeedManager.doDatafeedRealtime(10L, "foo", holder); verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); verify(auditor, times(1)).warning(eq("job_id"), anyString()); @@ -278,9 +278,9 @@ public class DatafeedJobRunnerTests extends ESTestCase { boolean cancelled = randomBoolean(); StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L); DatafeedTask task = StartDatafeedActionTests.createDatafeedTask(1, "type", "action", null, - startDatafeedRequest, datafeedJobRunner); + startDatafeedRequest, datafeedManager); task = spyDatafeedTask(task); - datafeedJobRunner.run(task, handler); + datafeedManager.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); if (cancelled) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java index b8362fb7eab..a85ef7f8fc5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java @@ -11,7 +11,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests; +import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; @@ -36,9 +36,9 @@ public class DataExtractorFactoryTests extends ESTestCase { public void testCreateDataExtractorFactoryGivenDefaultScroll() { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setTimeField("time"); - Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); + Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); - DatafeedConfig datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo").build(); + DatafeedConfig datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo").build(); DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig, jobBuilder.build(new Date())); @@ -49,9 +49,9 @@ public class DataExtractorFactoryTests extends ESTestCase { public void testCreateDataExtractorFactoryGivenScrollWithAutoChunk() { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setTimeField("time"); - Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); + Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); - DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); + DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); DataExtractorFactory dataExtractorFactory = @@ -63,9 +63,9 @@ public class DataExtractorFactoryTests extends ESTestCase { public void testCreateDataExtractorFactoryGivenScrollWithOffChunk() { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setTimeField("time"); - Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); + Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); - DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); + DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); datafeedConfig.setChunkingConfig(ChunkingConfig.newOff()); DataExtractorFactory dataExtractorFactory = @@ -77,9 +77,9 @@ public class DataExtractorFactoryTests extends ESTestCase { public void testCreateDataExtractorFactoryGivenAggregation() { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setTimeField("time"); - Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); + Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); - DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); + DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( AggregationBuilders.histogram("time").interval(300000))); @@ -92,9 +92,9 @@ public class DataExtractorFactoryTests extends ESTestCase { public void testCreateDataExtractorFactoryGivenDefaultAggregationWithAutoChunk() { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setTimeField("time"); - Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); + Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); jobBuilder.setDataDescription(dataDescription); - DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); + DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( AggregationBuilders.histogram("time").interval(300000))); datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());