Make renormalization thread-safe (elastic/elasticsearch#840)

Each ScoresUpdater needs its own JobRenormalizedResultsPersister, because
each JobRenormalizedResultsPersister has a single BulkRequest that various
methods update.

Fixes elastic/elasticsearch#838

Original commit: elastic/x-pack-elasticsearch@90f4bbd5a0
This commit is contained in:
David Roberts 2017-01-31 16:51:26 +00:00 committed by GitHub
parent f804bb1917
commit 7ff3b707a8
4 changed files with 8 additions and 16 deletions

View File

@ -203,7 +203,6 @@ public class MlPlugin extends Plugin implements ActionPlugin {
}
JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client);
JobProvider jobProvider = new JobProvider(client, 0);
JobRenormalizedResultsPersister jobRenormalizedResultsPersister = new JobRenormalizedResultsPersister(settings, client);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client);
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService);
@ -229,8 +228,7 @@ public class MlPlugin extends Plugin implements ActionPlugin {
threadPool.executor(MlPlugin.THREAD_POOL_NAME));
AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings);
AutodetectProcessManager dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, autodetectResultsParser,
autodetectProcessFactory, normalizerFactory);
jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, autodetectProcessFactory, normalizerFactory);
DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider,
System::currentTimeMillis);
PersistentActionService persistentActionService = new PersistentActionService(Settings.EMPTY, clusterService, client);

View File

@ -31,6 +31,8 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
* unusual scores.
* <p>
* Renormalized results must already have an ID.
* <p>
* This class is NOT thread safe.
*/
public class JobRenormalizedResultsPersister extends AbstractComponent {

View File

@ -73,7 +73,6 @@ public class AutodetectProcessManager extends AbstractComponent {
private final StateProcessor stateProcessor;
private final JobResultsPersister jobResultsPersister;
private final JobRenormalizedResultsPersister jobRenormalizedResultsPersister;
private final JobDataCountsPersister jobDataCountsPersister;
private final ConcurrentMap<String, AutodetectCommunicator> autoDetectCommunicatorByJob;
@ -82,7 +81,6 @@ public class AutodetectProcessManager extends AbstractComponent {
public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager,
JobProvider jobProvider, JobResultsPersister jobResultsPersister,
JobRenormalizedResultsPersister jobRenormalizedResultsPersister,
JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser,
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory) {
super(settings);
@ -96,7 +94,6 @@ public class AutodetectProcessManager extends AbstractComponent {
this.jobProvider = jobProvider;
this.jobResultsPersister = jobResultsPersister;
this.jobRenormalizedResultsPersister = jobRenormalizedResultsPersister;
this.stateProcessor = new StateProcessor(settings, jobResultsPersister);
this.jobDataCountsPersister = jobDataCountsPersister;
@ -233,7 +230,8 @@ public class AutodetectProcessManager extends AbstractComponent {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job.getId(), fetchDataCounts(jobId),
jobDataCountsPersister)) {
ScoresUpdater scoresUpdator = new ScoresUpdater(job, jobProvider, jobRenormalizedResultsPersister, normalizerFactory);
ScoresUpdater scoresUpdator = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client),
normalizerFactory);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdator,
threadPool.executor(MlPlugin.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization());
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, jobResultsPersister, parser);

View File

@ -24,7 +24,6 @@ import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
@ -76,7 +75,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private JobManager jobManager;
private JobProvider jobProvider;
private JobResultsPersister jobResultsPersister;
private JobRenormalizedResultsPersister jobRenormalizedResultsPersister;
private JobDataCountsPersister jobDataCountsPersister;
private NormalizerFactory normalizerFactory;
@ -85,7 +83,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
jobManager = mock(JobManager.class);
jobProvider = mock(JobProvider.class);
jobResultsPersister = mock(JobResultsPersister.class);
jobRenormalizedResultsPersister = mock(JobRenormalizedResultsPersister.class);
jobDataCountsPersister = mock(JobDataCountsPersister.class);
normalizerFactory = mock(NormalizerFactory.class);
givenAllocationWithStatus(JobStatus.OPENED);
@ -143,8 +140,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
Settings.Builder settings = Settings.builder();
settings.put(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey(), 3);
AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory,
normalizerFactory));
jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory));
ModelSnapshot modelSnapshot = new ModelSnapshot("foo");
Quantiles quantiles = new Quantiles("foo", new Date(), "state");
@ -300,8 +296,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess;
AutodetectProcessManager manager = spy(new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory,
normalizerFactory));
jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory));
ModelSnapshot modelSnapshot = new ModelSnapshot("foo");
Quantiles quantiles = new Quantiles("foo", new Date(), "state");
Set<MlFilter> filters = new HashSet<>();
@ -331,8 +326,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class);
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory,
normalizerFactory);
jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory);
manager = spy(manager);
ModelSnapshot modelSnapshot = new ModelSnapshot("foo");
Quantiles quantiles = new Quantiles("foo", new Date(), "state");