[7.x] Remove unused environment from anomaly detector classes (#54399) (#54456)

This commit is contained in:
David Kyle 2020-03-31 16:55:37 +01:00 committed by GitHub
parent e4230c533c
commit 9150e77269
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 19 additions and 33 deletions

View File

@ -602,7 +602,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
}
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(environment, settings, client, threadPool,
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, client, threadPool,
xContentRegistry, anomalyDetectionAuditor, clusterService, jobManager, jobResultsProvider, jobResultsPersister,
jobDataCountsPersister, anomalyDetectionAnnotationPersister, autodetectProcessFactory, normalizerFactory,
nativeStorageProvider, indexNameExpressionResolver);

View File

@ -88,7 +88,6 @@ public class JobManager {
private static final Logger logger = LogManager.getLogger(JobManager.class);
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
private final Environment environment;
private final JobResultsProvider jobResultsProvider;
private final JobResultsPersister jobResultsPersister;
private final ClusterService clusterService;
@ -108,7 +107,6 @@ public class JobManager {
JobResultsPersister jobResultsPersister, ClusterService clusterService, AnomalyDetectionAuditor auditor,
ThreadPool threadPool, Client client, UpdateJobProcessNotifier updateJobProcessNotifier,
NamedXContentRegistry xContentRegistry) {
this.environment = environment;
this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
this.clusterService = Objects.requireNonNull(clusterService);
@ -227,7 +225,7 @@ public class JobManager {
* The overall structure can be validated at parse time, but the exact names need to be checked separately,
* as plugins that provide the functionality can be installed/uninstalled.
*/
static void validateCategorizationAnalyzer(Job.Builder jobBuilder, AnalysisRegistry analysisRegistry, Environment environment)
static void validateCategorizationAnalyzer(Job.Builder jobBuilder, AnalysisRegistry analysisRegistry)
throws IOException {
CategorizationAnalyzerConfig categorizationAnalyzerConfig = jobBuilder.getAnalysisConfig().getCategorizationAnalyzerConfig();
if (categorizationAnalyzerConfig != null) {
@ -243,7 +241,7 @@ public class JobManager {
ActionListener<PutJobAction.Response> actionListener) throws IOException {
request.getJobBuilder().validateAnalysisLimitsAndSetDefaults(maxModelMemoryLimit);
validateCategorizationAnalyzer(request.getJobBuilder(), analysisRegistry, environment);
validateCategorizationAnalyzer(request.getJobBuilder(), analysisRegistry);
Job job = request.getJobBuilder().build(new Date());
@ -331,9 +329,7 @@ public class JobManager {
);
ActionListener<Boolean> checkNoGroupWithTheJobId = ActionListener.wrap(
ok -> {
jobConfigProvider.groupExists(job.getId(), checkNoJobsWithGroupId);
},
ok -> jobConfigProvider.groupExists(job.getId(), checkNoJobsWithGroupId),
actionListener::onFailure
);
@ -400,7 +396,7 @@ public class JobManager {
));
}
} else {
logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> {
logger.debug("[{}] No process update required for job update: {}", request::getJobId, () -> {
try {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
request.getJobUpdate().toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);

View File

@ -15,7 +15,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
@ -47,7 +46,6 @@ import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -62,7 +60,6 @@ public class AutodetectCommunicator implements Closeable {
private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration.ofSeconds(1);
private final Job job;
private final Environment environment;
private final AutodetectProcess autodetectProcess;
private final StateStreamer stateStreamer;
private final DataCountsReporter dataCountsReporter;
@ -74,12 +71,11 @@ public class AutodetectCommunicator implements Closeable {
private volatile CategorizationAnalyzer categorizationAnalyzer;
private volatile boolean processKilled;
AutodetectCommunicator(Job job, Environment environment, AutodetectProcess process, StateStreamer stateStreamer,
AutodetectCommunicator(Job job, AutodetectProcess process, StateStreamer stateStreamer,
DataCountsReporter dataCountsReporter, AutodetectResultProcessor autodetectResultProcessor,
BiConsumer<Exception, Boolean> onFinishHandler, NamedXContentRegistry xContentRegistry,
ExecutorService autodetectWorkerExecutor) {
this.job = job;
this.environment = environment;
this.autodetectProcess = process;
this.stateStreamer = stateStreamer;
this.dataCountsReporter = dataCountsReporter;
@ -95,9 +91,9 @@ public class AutodetectCommunicator implements Closeable {
autodetectProcess.restoreState(stateStreamer, modelSnapshot);
}
private DataToProcessWriter createProcessWriter(Optional<DataDescription> dataDescription) {
private DataToProcessWriter createProcessWriter(DataDescription dataDescription) {
return DataToProcessWriterFactory.create(true, includeTokensField, autodetectProcess,
dataDescription.orElse(job.getDataDescription()), job.getAnalysisConfig(),
dataDescription, job.getAnalysisConfig(),
dataCountsReporter, xContentRegistry);
}
@ -106,7 +102,7 @@ public class AutodetectCommunicator implements Closeable {
* can be used
*/
public void writeHeader() throws IOException {
createProcessWriter(Optional.empty()).writeHeader();
createProcessWriter(job.getDataDescription()).writeHeader();
}
/**
@ -120,7 +116,7 @@ public class AutodetectCommunicator implements Closeable {
}
CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter);
DataToProcessWriter autodetectWriter = createProcessWriter(params.getDataDescription());
DataToProcessWriter autodetectWriter = createProcessWriter(params.getDataDescription().orElse(job.getDataDescription()));
if (includeTokensField && categorizationAnalyzer == null) {
createCategorizationAnalyzer(analysisRegistry);
@ -148,7 +144,7 @@ public class AutodetectCommunicator implements Closeable {
}
@Override
public void close() throws IOException {
public void close() {
close(false, null);
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.rest.RestStatus;
@ -94,7 +93,6 @@ public class AutodetectProcessManager implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(AutodetectProcessManager.class);
private final Client client;
private final Environment environment;
private final ThreadPool threadPool;
private final JobManager jobManager;
private final JobResultsProvider jobResultsProvider;
@ -117,13 +115,12 @@ public class AutodetectProcessManager implements ClusterStateListener {
private volatile boolean upgradeInProgress;
public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool,
public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor, ClusterService clusterService,
JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister, AnnotationPersister annotationPersister,
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
NativeStorageProvider nativeStorageProvider, IndexNameExpressionResolver expressionResolver) {
this.environment = environment;
this.client = client;
this.threadPool = threadPool;
this.xContentRegistry = xContentRegistry;
@ -339,7 +336,8 @@ public class AutodetectProcessManager implements ClusterStateListener {
jobManager.getJob(jobTask.getJobId(), new ActionListener<Job>() {
@Override
public void onResponse(Job job) {
DataCounts dataCounts = getStatistics(jobTask).get().v1();
Optional<Tuple<DataCounts, Tuple<ModelSizeStats, TimingStats>>> stats = getStatistics(jobTask);
DataCounts dataCounts = stats.isPresent() ? stats.get().v1() : new DataCounts(job.getId());
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder()
.start(job.earliestValidTimestamp(dataCounts));
jobResultsProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
@ -532,7 +530,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
}
throw e;
}
return new AutodetectCommunicator(job, environment, process, new StateStreamer(client), dataCountsReporter, processor, handler,
return new AutodetectCommunicator(job, process, new StateStreamer(client), dataCountsReporter, processor, handler,
xContentRegistry, autodetectWorkerExecutor);
}

View File

@ -64,15 +64,13 @@ import static org.mockito.Mockito.when;
public class AutodetectCommunicatorTests extends ESTestCase {
private Environment environment;
private AnalysisRegistry analysisRegistry;
private StateStreamer stateStreamer;
@Before
public void setup() throws Exception {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
environment = TestEnvironment.newEnvironment(settings);
analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(environment);
analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(TestEnvironment.newEnvironment(settings));
stateStreamer = mock(StateStreamer.class);
}
@ -236,7 +234,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
BiConsumer<Exception, Boolean> finishHandler) throws IOException {
DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class);
doNothing().when(dataCountsReporter).finishReporting();
return new AutodetectCommunicator(createJobDetails(), environment, autodetectProcess,
return new AutodetectCommunicator(createJobDetails(), autodetectProcess,
stateStreamer, dataCountsReporter, autodetectResultProcessor, finishHandler,
new NamedXContentRegistry(Collections.emptyList()), executorService);
}

View File

@ -117,7 +117,6 @@ import static org.mockito.Mockito.spy;
*/
public class AutodetectProcessManagerTests extends ESTestCase {
private Environment environment;
private Client client;
private ThreadPool threadPool;
private AnalysisRegistry analysisRegistry;
@ -143,14 +142,13 @@ public class AutodetectProcessManagerTests extends ESTestCase {
@Before
public void setup() throws Exception {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
environment = TestEnvironment.newEnvironment(settings);
client = mock(Client.class);
threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService());
analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(environment);
analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(TestEnvironment.newEnvironment(settings));
jobManager = mock(JobManager.class);
jobResultsProvider = mock(JobResultsProvider.class);
jobResultsPersister = mock(JobResultsPersister.class);
@ -711,7 +709,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
}
private AutodetectProcessManager createManager(Settings settings) {
return new AutodetectProcessManager(environment, settings,
return new AutodetectProcessManager(settings,
client, threadPool, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService, jobManager, jobResultsProvider,
jobResultsPersister, jobDataCountsPersister, annotationPersister, autodetectFactory, normalizerFactory, nativeStorageProvider,
new IndexNameExpressionResolver());