From 9150e77269f9974cd042d8150f12c21cc04e8a8c Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 31 Mar 2020 16:55:37 +0100 Subject: [PATCH] [7.x] Remove unused environment from anomaly detector classes (#54399) (#54456) --- .../elasticsearch/xpack/ml/MachineLearning.java | 2 +- .../elasticsearch/xpack/ml/job/JobManager.java | 12 ++++-------- .../autodetect/AutodetectCommunicator.java | 16 ++++++---------- .../autodetect/AutodetectProcessManager.java | 10 ++++------ .../autodetect/AutodetectCommunicatorTests.java | 6 ++---- .../AutodetectProcessManagerTests.java | 6 ++---- 6 files changed, 19 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 9670255174d..a1e9a3bd3cf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -602,7 +602,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys } NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); - AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(environment, settings, client, threadPool, + AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, client, threadPool, xContentRegistry, anomalyDetectionAuditor, clusterService, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, anomalyDetectionAnnotationPersister, autodetectProcessFactory, normalizerFactory, nativeStorageProvider, indexNameExpressionResolver); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index c35351e586d..56159025903 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -88,7 +88,6 @@ public class JobManager { private static final Logger logger = LogManager.getLogger(JobManager.class); private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger); - private final Environment environment; private final JobResultsProvider jobResultsProvider; private final JobResultsPersister jobResultsPersister; private final ClusterService clusterService; @@ -108,7 +107,6 @@ public class JobManager { JobResultsPersister jobResultsPersister, ClusterService clusterService, AnomalyDetectionAuditor auditor, ThreadPool threadPool, Client client, UpdateJobProcessNotifier updateJobProcessNotifier, NamedXContentRegistry xContentRegistry) { - this.environment = environment; this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister); this.clusterService = Objects.requireNonNull(clusterService); @@ -227,7 +225,7 @@ public class JobManager { * The overall structure can be validated at parse time, but the exact names need to be checked separately, * as plugins that provide the functionality can be installed/uninstalled. */ - static void validateCategorizationAnalyzer(Job.Builder jobBuilder, AnalysisRegistry analysisRegistry, Environment environment) + static void validateCategorizationAnalyzer(Job.Builder jobBuilder, AnalysisRegistry analysisRegistry) throws IOException { CategorizationAnalyzerConfig categorizationAnalyzerConfig = jobBuilder.getAnalysisConfig().getCategorizationAnalyzerConfig(); if (categorizationAnalyzerConfig != null) { @@ -243,7 +241,7 @@ public class JobManager { ActionListener actionListener) throws IOException { request.getJobBuilder().validateAnalysisLimitsAndSetDefaults(maxModelMemoryLimit); - validateCategorizationAnalyzer(request.getJobBuilder(), analysisRegistry, environment); + validateCategorizationAnalyzer(request.getJobBuilder(), analysisRegistry); Job job = request.getJobBuilder().build(new Date()); @@ -331,9 +329,7 @@ public class JobManager { ); ActionListener checkNoGroupWithTheJobId = ActionListener.wrap( - ok -> { - jobConfigProvider.groupExists(job.getId(), checkNoJobsWithGroupId); - }, + ok -> jobConfigProvider.groupExists(job.getId(), checkNoJobsWithGroupId), actionListener::onFailure ); @@ -400,7 +396,7 @@ public class JobManager { )); } } else { - logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> { + logger.debug("[{}] No process update required for job update: {}", request::getJobId, () -> { try { XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); request.getJobUpdate().toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 7f28a0859c6..3fa4c36cb04 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -15,7 +15,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig; @@ -47,7 +46,6 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.util.Collections; import java.util.Locale; -import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -62,7 +60,6 @@ public class AutodetectCommunicator implements Closeable { private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration.ofSeconds(1); private final Job job; - private final Environment environment; private final AutodetectProcess autodetectProcess; private final StateStreamer stateStreamer; private final DataCountsReporter dataCountsReporter; @@ -74,12 +71,11 @@ public class AutodetectCommunicator implements Closeable { private volatile CategorizationAnalyzer categorizationAnalyzer; private volatile boolean processKilled; - AutodetectCommunicator(Job job, Environment environment, AutodetectProcess process, StateStreamer stateStreamer, + AutodetectCommunicator(Job job, AutodetectProcess process, StateStreamer stateStreamer, DataCountsReporter dataCountsReporter, AutodetectResultProcessor autodetectResultProcessor, BiConsumer onFinishHandler, NamedXContentRegistry xContentRegistry, ExecutorService autodetectWorkerExecutor) { this.job = job; - this.environment = environment; this.autodetectProcess = process; this.stateStreamer = stateStreamer; this.dataCountsReporter = dataCountsReporter; @@ -95,9 +91,9 @@ public class AutodetectCommunicator implements Closeable { autodetectProcess.restoreState(stateStreamer, modelSnapshot); } - private DataToProcessWriter createProcessWriter(Optional dataDescription) { + private DataToProcessWriter createProcessWriter(DataDescription dataDescription) { return DataToProcessWriterFactory.create(true, includeTokensField, autodetectProcess, - dataDescription.orElse(job.getDataDescription()), job.getAnalysisConfig(), + dataDescription, job.getAnalysisConfig(), dataCountsReporter, xContentRegistry); } @@ -106,7 +102,7 @@ public class AutodetectCommunicator implements Closeable { * can be used */ public void writeHeader() throws IOException { - createProcessWriter(Optional.empty()).writeHeader(); + createProcessWriter(job.getDataDescription()).writeHeader(); } /** @@ -120,7 +116,7 @@ public class AutodetectCommunicator implements Closeable { } CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter); - DataToProcessWriter autodetectWriter = createProcessWriter(params.getDataDescription()); + DataToProcessWriter autodetectWriter = createProcessWriter(params.getDataDescription().orElse(job.getDataDescription())); if (includeTokensField && categorizationAnalyzer == null) { createCategorizationAnalyzer(analysisRegistry); @@ -148,7 +144,7 @@ public class AutodetectCommunicator implements Closeable { } @Override - public void close() throws IOException { + public void close() { close(false, null); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 60038a9641e..9b99e97d1df 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.rest.RestStatus; @@ -94,7 +93,6 @@ public class AutodetectProcessManager implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(AutodetectProcessManager.class); private final Client client; - private final Environment environment; private final ThreadPool threadPool; private final JobManager jobManager; private final JobResultsProvider jobResultsProvider; @@ -117,13 +115,12 @@ public class AutodetectProcessManager implements ClusterStateListener { private volatile boolean upgradeInProgress; - public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool, + public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor, ClusterService clusterService, JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister, JobDataCountsPersister jobDataCountsPersister, AnnotationPersister annotationPersister, AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, NativeStorageProvider nativeStorageProvider, IndexNameExpressionResolver expressionResolver) { - this.environment = environment; this.client = client; this.threadPool = threadPool; this.xContentRegistry = xContentRegistry; @@ -339,7 +336,8 @@ public class AutodetectProcessManager implements ClusterStateListener { jobManager.getJob(jobTask.getJobId(), new ActionListener() { @Override public void onResponse(Job job) { - DataCounts dataCounts = getStatistics(jobTask).get().v1(); + Optional>> stats = getStatistics(jobTask); + DataCounts dataCounts = stats.isPresent() ? stats.get().v1() : new DataCounts(job.getId()); ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder() .start(job.earliestValidTimestamp(dataCounts)); jobResultsProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener); @@ -532,7 +530,7 @@ public class AutodetectProcessManager implements ClusterStateListener { } throw e; } - return new AutodetectCommunicator(job, environment, process, new StateStreamer(client), dataCountsReporter, processor, handler, + return new AutodetectCommunicator(job, process, new StateStreamer(client), dataCountsReporter, processor, handler, xContentRegistry, autodetectWorkerExecutor); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index e9fbaf4bbb0..465a9501e6d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -64,15 +64,13 @@ import static org.mockito.Mockito.when; public class AutodetectCommunicatorTests extends ESTestCase { - private Environment environment; private AnalysisRegistry analysisRegistry; private StateStreamer stateStreamer; @Before public void setup() throws Exception { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); - environment = TestEnvironment.newEnvironment(settings); - analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(environment); + analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(TestEnvironment.newEnvironment(settings)); stateStreamer = mock(StateStreamer.class); } @@ -236,7 +234,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { BiConsumer finishHandler) throws IOException { DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class); doNothing().when(dataCountsReporter).finishReporting(); - return new AutodetectCommunicator(createJobDetails(), environment, autodetectProcess, + return new AutodetectCommunicator(createJobDetails(), autodetectProcess, stateStreamer, dataCountsReporter, autodetectResultProcessor, finishHandler, new NamedXContentRegistry(Collections.emptyList()), executorService); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index b75b7f0a96b..38782ce9852 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -117,7 +117,6 @@ import static org.mockito.Mockito.spy; */ public class AutodetectProcessManagerTests extends ESTestCase { - private Environment environment; private Client client; private ThreadPool threadPool; private AnalysisRegistry analysisRegistry; @@ -143,14 +142,13 @@ public class AutodetectProcessManagerTests extends ESTestCase { @Before public void setup() throws Exception { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); - environment = TestEnvironment.newEnvironment(settings); client = mock(Client.class); threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService()); - analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(environment); + analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(TestEnvironment.newEnvironment(settings)); jobManager = mock(JobManager.class); jobResultsProvider = mock(JobResultsProvider.class); jobResultsPersister = mock(JobResultsPersister.class); @@ -711,7 +709,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { } private AutodetectProcessManager createManager(Settings settings) { - return new AutodetectProcessManager(environment, settings, + return new AutodetectProcessManager(settings, client, threadPool, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, annotationPersister, autodetectFactory, normalizerFactory, nativeStorageProvider, new IndexNameExpressionResolver());