Removed forgotten blocking call when opening a job.
Original commit: elastic/x-pack-elasticsearch@e1dfa54240
This commit is contained in:
parent
9b0344cd90
commit
a7d95951a6
|
@ -51,10 +51,8 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class AutodetectProcessManager extends AbstractComponent {
|
||||
|
@ -183,9 +181,10 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
}
|
||||
|
||||
public void openJob(String jobId, boolean ignoreDowntime, Consumer<Exception> handler) {
|
||||
gatherRequiredInformation(jobId, (modelSnapshot, quantiles, filters) -> {
|
||||
gatherRequiredInformation(jobId, (dataCounts, modelSnapshot, quantiles, filters) -> {
|
||||
autoDetectCommunicatorByJob.computeIfAbsent(jobId, id -> {
|
||||
AutodetectCommunicator communicator = create(id, modelSnapshot, quantiles, filters, ignoreDowntime, handler);
|
||||
AutodetectCommunicator communicator =
|
||||
create(id, dataCounts, modelSnapshot, quantiles, filters, ignoreDowntime, handler);
|
||||
try {
|
||||
communicator.writeJobInputHeader();
|
||||
} catch (IOException ioe) {
|
||||
|
@ -199,25 +198,30 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
}, handler);
|
||||
}
|
||||
|
||||
void gatherRequiredInformation(String jobId, TriConsumer handler, Consumer<Exception> errorHandler) {
|
||||
// TODO: add a method on JobProvider that fetches all required info via 1 msearch call, so that we have a single lambda
|
||||
// instead of 4 nested lambdas.
|
||||
void gatherRequiredInformation(String jobId, MultiConsumer handler, Consumer<Exception> errorHandler) {
|
||||
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
|
||||
jobProvider.modelSnapshots(jobId, 0, 1, page -> {
|
||||
ModelSnapshot modelSnapshot = page.results().isEmpty() ? null : page.results().get(0);
|
||||
jobProvider.getQuantiles(jobId, quantiles -> {
|
||||
Set<String> ids = job.getAnalysisConfig().extractReferencedFilters();
|
||||
jobProvider.getFilters(filterDocument -> handler.accept(modelSnapshot, quantiles, filterDocument), errorHandler, ids);
|
||||
jobProvider.dataCounts(jobId, dataCounts -> {
|
||||
jobProvider.modelSnapshots(jobId, 0, 1, page -> {
|
||||
ModelSnapshot modelSnapshot = page.results().isEmpty() ? null : page.results().get(0);
|
||||
jobProvider.getQuantiles(jobId, quantiles -> {
|
||||
Set<String> ids = job.getAnalysisConfig().extractReferencedFilters();
|
||||
jobProvider.getFilters(filterDocument -> handler.accept(dataCounts, modelSnapshot, quantiles, filterDocument),
|
||||
errorHandler, ids);
|
||||
}, errorHandler);
|
||||
}, errorHandler);
|
||||
}, errorHandler);
|
||||
}
|
||||
|
||||
interface TriConsumer {
|
||||
interface MultiConsumer {
|
||||
|
||||
void accept(ModelSnapshot modelSnapshot, Quantiles quantiles, Set<MlFilter> filters);
|
||||
void accept(DataCounts dataCounts, ModelSnapshot modelSnapshot, Quantiles quantiles, Set<MlFilter> filters);
|
||||
|
||||
}
|
||||
|
||||
AutodetectCommunicator create(String jobId, ModelSnapshot modelSnapshot, Quantiles quantiles, Set<MlFilter> filters,
|
||||
boolean ignoreDowntime, Consumer<Exception> handler) {
|
||||
AutodetectCommunicator create(String jobId, DataCounts dataCounts, ModelSnapshot modelSnapshot, Quantiles quantiles,
|
||||
Set<MlFilter> filters, boolean ignoreDowntime, Consumer<Exception> handler) {
|
||||
if (autoDetectCommunicatorByJob.size() == maxAllowedRunningJobs) {
|
||||
throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached",
|
||||
RestStatus.CONFLICT);
|
||||
|
@ -229,7 +233,7 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
// A TP with no queue, so that we fail immediately if there are no threads available
|
||||
ExecutorService executorService = threadPool.executor(MlPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME);
|
||||
|
||||
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job.getId(), fetchDataCounts(jobId),
|
||||
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job.getId(), dataCounts,
|
||||
jobDataCountsPersister)) {
|
||||
ScoresUpdater scoresUpdator = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client),
|
||||
normalizerFactory);
|
||||
|
@ -253,28 +257,6 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private DataCounts fetchDataCounts(String jobId) {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<DataCounts> holder = new AtomicReference<>();
|
||||
AtomicReference<Exception> errorHolder = new AtomicReference<>();
|
||||
jobProvider.dataCounts(jobId, dataCounts -> {
|
||||
holder.set(dataCounts);
|
||||
latch.countDown();
|
||||
}, e -> {
|
||||
errorHolder.set(e);
|
||||
latch.countDown();
|
||||
});
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
if (errorHolder.get() != null) {
|
||||
throw org.elasticsearch.ExceptionsHelper.convertToElastic(errorHolder.get());
|
||||
}
|
||||
return holder.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the running job and mark it as finished.<br>
|
||||
* @param jobId The job to stop
|
||||
|
|
|
@ -80,6 +80,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
private JobDataCountsPersister jobDataCountsPersister;
|
||||
private NormalizerFactory normalizerFactory;
|
||||
|
||||
private DataCounts dataCounts = new DataCounts("foo");
|
||||
private ModelSnapshot modelSnapshot = new ModelSnapshot("foo");
|
||||
private Quantiles quantiles = new Quantiles("foo", new Date(), "state");
|
||||
private Set<MlFilter> filters = new HashSet<>();
|
||||
|
@ -94,6 +95,12 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
givenAllocationWithState(JobState.OPENED);
|
||||
|
||||
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
|
||||
doAnswer(invocationOnMock -> {
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<DataCounts> handler = (Consumer<DataCounts>) invocationOnMock.getArguments()[1];
|
||||
handler.accept(dataCounts);
|
||||
return null;
|
||||
}).when(jobProvider).dataCounts(any(), any(), any());
|
||||
doAnswer(invocationOnMock -> {
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<QueryPage<ModelSnapshot>> handler = (Consumer<QueryPage<ModelSnapshot>>) invocationOnMock.getArguments()[3];
|
||||
|
@ -168,12 +175,13 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
AutodetectProcessManager manager = spy(new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider,
|
||||
jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory));
|
||||
|
||||
DataCounts dataCounts = new DataCounts("foo");
|
||||
ModelSnapshot modelSnapshot = new ModelSnapshot("foo");
|
||||
Quantiles quantiles = new Quantiles("foo", new Date(), "state");
|
||||
Set<MlFilter> filters = new HashSet<>();
|
||||
doAnswer(invocationOnMock -> {
|
||||
AutodetectProcessManager.TriConsumer consumer = (AutodetectProcessManager.TriConsumer) invocationOnMock.getArguments()[1];
|
||||
consumer.accept(modelSnapshot, quantiles, filters);
|
||||
AutodetectProcessManager.MultiConsumer consumer = (AutodetectProcessManager.MultiConsumer) invocationOnMock.getArguments()[1];
|
||||
consumer.accept(dataCounts, modelSnapshot, quantiles, filters);
|
||||
return null;
|
||||
}).when(manager).gatherRequiredInformation(any(), any(), any());
|
||||
|
||||
|
@ -319,7 +327,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
|
||||
jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory);
|
||||
|
||||
expectThrows(EsRejectedExecutionException.class, () -> manager.create("my_id", modelSnapshot, quantiles, filters, false, e -> {}));
|
||||
expectThrows(EsRejectedExecutionException.class,
|
||||
() -> manager.create("my_id", dataCounts, modelSnapshot, quantiles, filters, false, e -> {}));
|
||||
verify(autodetectProcess, times(1)).close();
|
||||
}
|
||||
|
||||
|
@ -341,7 +350,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
|
||||
jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory);
|
||||
manager = spy(manager);
|
||||
doReturn(communicator).when(manager).create(any(), eq(modelSnapshot), eq(quantiles), eq(filters), anyBoolean(), any());
|
||||
doReturn(communicator).when(manager)
|
||||
.create(any(), eq(dataCounts), eq(modelSnapshot), eq(quantiles), eq(filters), anyBoolean(), any());
|
||||
return manager;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue