Made client calls non blocking for JobProvider#getDataCounts(..)

Original commit: elastic/x-pack-elasticsearch@4d6d6360f6
This commit is contained in:
Martijn van Groningen 2017-01-06 11:41:03 +01:00
parent 468402426e
commit 092d2e2bdc
7 changed files with 146 additions and 86 deletions

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -41,10 +42,12 @@ import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJobsStatsAction.Response, GetJobsStatsAction.RequestBuilder> {
@ -297,24 +300,36 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
logger.debug("Get stats for job '{}'", request.getJobId());
List<Response.JobStats> jobsStats = new ArrayList<>();
QueryPage<Job> jobs = jobManager.getJob(request.getJobId(), clusterService.state());
PrelertMetadata prelertMetadata = clusterService.state().metaData().custom(PrelertMetadata.TYPE);
for (Job job : jobs.results()) {
DataCounts dataCounts = readDataCounts(job.getId());
AtomicInteger counter = new AtomicInteger(0);
AtomicArray<Response.JobStats> jobsStats = new AtomicArray<>(jobs.results().size());
for (int i = 0; i < jobs.results().size(); i++) {
int slot = i;
Job job = jobs.results().get(slot);
readDataCounts(job.getId(), dataCounts -> {
ModelSizeStats modelSizeStats = readModelSizeStats(job.getId());
JobStatus status = prelertMetadata.getAllocations().get(job.getId()).getStatus();
jobsStats.add(new Response.JobStats(job.getId(), dataCounts, modelSizeStats, status));
}
jobsStats.setOnce(slot, new Response.JobStats(job.getId(), dataCounts, modelSizeStats, status));
QueryPage<Response.JobStats> jobsStatsPage = new QueryPage<>(jobsStats, jobsStats.size(), Job.RESULTS_FIELD);
if (counter.incrementAndGet() == jobsStats.length()) {
List<Response.JobStats> results =
jobsStats.asList().stream().map(entry -> entry.value).collect(Collectors.toList());
QueryPage<Response.JobStats> jobsStatsPage = new QueryPage<>(results, results.size(), Job.RESULTS_FIELD);
listener.onResponse(new GetJobsStatsAction.Response(jobsStatsPage));
}
}, listener::onFailure);
}
}
private DataCounts readDataCounts(String jobId) {
private void readDataCounts(String jobId, Consumer<DataCounts> handler, Consumer<Exception> errorHandler) {
Optional<DataCounts> counts = processManager.getDataCounts(jobId);
return counts.orElseGet(() -> jobProvider.dataCounts(jobId));
if (counts.isPresent()) {
handler.accept(counts.get());
} else {
jobProvider.dataCounts(jobId, handler, errorHandler);
}
}
private ModelSizeStats readModelSizeStats(String jobId) {

View File

@ -36,16 +36,15 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleterFactory;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.OldDataRemover;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
@ -419,7 +418,7 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
return ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
DataCounts counts = jobProvider.dataCounts(jobId);
jobProvider.dataCounts(jobId, counts -> {
counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp());
jobDataCountsPersister.persistDataCounts(jobId, counts, new ActionListener<Boolean>() {
@Override
@ -432,6 +431,7 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
listener.onFailure(e);
}
});
}, listener::onFailure);
}
}, listener::onFailure);
}

View File

@ -10,7 +10,6 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
@ -24,10 +23,10 @@ import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.data.DataProcessor;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.UsagePersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.UsagePersister;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectCommunicator;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcessFactory;
@ -52,8 +51,10 @@ import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
public class AutodetectProcessManager extends AbstractComponent implements DataProcessor {
@ -190,7 +191,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
ExecutorService executorService = threadPool.executor(PrelertPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME);
UsageReporter usageReporter = new UsageReporter(settings, job.getId(), usagePersister);
try (StatusReporter statusReporter = new StatusReporter(threadPool, settings, job.getId(), jobProvider.dataCounts(jobId),
try (StatusReporter statusReporter = new StatusReporter(threadPool, settings, job.getId(), fetchDataCounts(jobId),
usageReporter, jobDataCountsPersister)) {
ScoresUpdater scoresUpdator = new ScoresUpdater(job, jobProvider, jobRenormalizedResultsPersister, normalizerFactory);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdator,
@ -212,6 +213,28 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
}
}
private DataCounts fetchDataCounts(String jobId) {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<DataCounts> holder = new AtomicReference<>();
AtomicReference<Exception> errorHolder = new AtomicReference<>();
jobProvider.dataCounts(jobId, dataCounts -> {
holder.set(dataCounts);
latch.countDown();
}, e -> {
errorHolder.set(e);
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (errorHolder.get() != null) {
throw org.elasticsearch.ExceptionsHelper.convertToElastic(errorHolder.get());
}
return holder.get();
}
@Override
public void closeJob(String jobId) {
logger.debug("Closing job {}", jobId);

View File

@ -338,30 +338,32 @@ public class JobProvider {
* Get the job's data counts
*
* @param jobId The job id
* @return The dataCounts or default constructed object if not found
*/
public DataCounts dataCounts(String jobId) {
public void dataCounts(String jobId, Consumer<DataCounts> handler, Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
try {
GetRequest getRequest = new GetRequest(indexName, DataCounts.TYPE.getPreferredName(), jobId + DataCounts.DOCUMENT_SUFFIX);
GetResponse response = FixBlockingClientOperations.executeBlocking(client, GetAction.INSTANCE, getRequest);
client.get(getRequest, ActionListener.wrap(
response -> {
if (response.isExists() == false) {
return new DataCounts(jobId);
handler.accept(new DataCounts(jobId));
} else {
BytesReference source = response.getSourceAsBytesRef();
XContentParser parser;
try {
parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source);
return DataCounts.PARSER.apply(parser, () -> parseFieldMatcher);
handler.accept(DataCounts.PARSER.apply(parser, () -> parseFieldMatcher));
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse bucket", e);
}
}
} catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId);
},
e -> {
if (e instanceof IndexNotFoundException) {
errorHandler.accept(ExceptionsHelper.missingJobException(jobId));
} else {
errorHandler.accept(e);
}
}));
}
/**

View File

@ -30,12 +30,15 @@ import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -66,27 +69,19 @@ public class ScheduledJobRunner extends AbstractComponent {
Scheduler scheduler = prelertMetadata.getScheduler(schedulerId);
Job job = prelertMetadata.getJobs().get(scheduler.getJobId());
BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder()
.sortField(Bucket.TIMESTAMP.getPreferredName())
.sortDescending(true).size(1)
.includeInterim(false)
.build();
jobProvider.buckets(job.getId(), latestBucketQuery, buckets -> {
gatherInformation(job.getId(), (buckets, dataCounts) -> {
long latestFinalBucketEndMs = -1L;
Duration bucketSpan = Duration.ofSeconds(job.getAnalysisConfig().getBucketSpan());
if (buckets.results().size() == 1) {
latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.toMillis() - 1;
}
Holder holder = createJobScheduler(scheduler, job, latestFinalBucketEndMs, handler, task);
innerRun(holder, startTime, endTime);
}, e -> {
if (e instanceof ResourceNotFoundException) {
Holder holder = createJobScheduler(scheduler, job, -1L, handler, task);
innerRun(holder, startTime, endTime);
} else {
handler.accept(e);
long latestRecordTimeMs = -1L;
if (dataCounts.getLatestRecordTimeStamp() != null) {
latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime();
}
});
Holder holder = createJobScheduler(scheduler, job, latestFinalBucketEndMs, latestRecordTimeMs, handler, task);
innerRun(holder, startTime, endTime);
}, handler);
}
// Important: Holder must be created and assigned to SchedulerTask before setting status to started,
@ -192,26 +187,35 @@ public class ScheduledJobRunner extends AbstractComponent {
ScheduledJobValidator.validate(scheduler.getConfig(), job);
}
private Holder createJobScheduler(Scheduler scheduler, Job job, long latestFinalBucketEndMs, Consumer<Exception> handler,
StartSchedulerAction.SchedulerTask task) {
private Holder createJobScheduler(Scheduler scheduler, Job job, long finalBucketEndMs, long latestRecordTimeMs,
Consumer<Exception> handler, StartSchedulerAction.SchedulerTask task) {
Auditor auditor = jobProvider.audit(job.getId());
Duration frequency = getFrequencyOrDefault(scheduler, job);
Duration queryDelay = Duration.ofSeconds(scheduler.getConfig().getQueryDelay());
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(scheduler.getConfig(), job);
ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(),
dataExtractor, client, auditor, currentTimeSupplier, latestFinalBucketEndMs, getLatestRecordTimestamp(job.getId()));
dataExtractor, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs);
Holder holder = new Holder(scheduler, scheduledJob, new ProblemTracker(() -> auditor), handler);
task.setHolder(holder);
return holder;
}
private long getLatestRecordTimestamp(String jobId) {
long latestRecordTimeMs = -1L;
DataCounts dataCounts = jobProvider.dataCounts(jobId);
if (dataCounts.getLatestRecordTimeStamp() != null) {
latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime();
private void gatherInformation(String jobId, BiConsumer<QueryPage<Bucket>, DataCounts> handler, Consumer<Exception> errorHandler) {
BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder()
.sortField(Bucket.TIMESTAMP.getPreferredName())
.sortDescending(true).size(1)
.includeInterim(false)
.build();
jobProvider.buckets(jobId, latestBucketQuery, buckets -> {
jobProvider.dataCounts(jobId, dataCounts -> handler.accept(buckets, dataCounts), errorHandler);
}, e -> {
if (e instanceof ResourceNotFoundException) {
QueryPage<Bucket> empty = new QueryPage<>(Collections.emptyList(), 0, Bucket.RESULT_TYPE_FIELD);
jobProvider.dataCounts(jobId, dataCounts -> handler.accept(empty, dataCounts), errorHandler);
} else {
errorHandler.accept(e);
}
return latestRecordTimeMs;
});
}
private static Duration getFrequencyOrDefault(Scheduler scheduler, Job job) {

View File

@ -43,10 +43,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
@ -104,13 +103,17 @@ public class AutodetectProcessManagerTests extends ESTestCase {
public void testOpenJob_exceedMaxNumJobs() {
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
when(jobProvider.dataCounts("foo")).thenReturn(new DataCounts("foo"));
doAnswer(invocationOnMock -> {
String jobId = (String) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
Consumer<DataCounts> handler = (Consumer<DataCounts>) invocationOnMock.getArguments()[1];
handler.accept(new DataCounts(jobId));
return null;
}).when(jobProvider).dataCounts(any(), any(), any());
when(jobManager.getJobOrThrowIfUnknown("bar")).thenReturn(createJobDetails("bar"));
when(jobProvider.dataCounts("bar")).thenReturn(new DataCounts("bar"));
when(jobManager.getJobOrThrowIfUnknown("baz")).thenReturn(createJobDetails("baz"));
when(jobProvider.dataCounts("baz")).thenReturn(new DataCounts("baz"));
when(jobManager.getJobOrThrowIfUnknown("foobar")).thenReturn(createJobDetails("foobar"));
when(jobProvider.dataCounts("foobar")).thenReturn(new DataCounts("foobar"));
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
@ -275,7 +278,13 @@ public class AutodetectProcessManagerTests extends ESTestCase {
doThrow(new EsRejectedExecutionException("")).when(executorService).execute(any());
when(threadPool.executor(anyString())).thenReturn(executorService);
when(jobManager.getJobOrThrowIfUnknown("my_id")).thenReturn(createJobDetails("my_id"));
when(jobProvider.dataCounts("my_id")).thenReturn(new DataCounts("my_id"));
doAnswer(invocationOnMock -> {
String jobId = (String) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
Consumer<DataCounts> handler = (Consumer<DataCounts>) invocationOnMock.getArguments()[1];
handler.accept(new DataCounts(jobId));
return null;
}).when(jobProvider).dataCounts(eq("my_id"), any(), any());
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);

View File

@ -15,6 +15,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@ -85,7 +86,13 @@ public class ScheduledJobRunnerTests extends ESTestCase {
}).when(client).execute(same(UpdateSchedulerStatusAction.INSTANCE), any(), any());
JobProvider jobProvider = mock(JobProvider.class);
when(jobProvider.dataCounts(anyString())).thenReturn(new DataCounts("foo"));
Mockito.doAnswer(invocationOnMock -> {
String jobId = (String) invocationOnMock.getArguments()[0];
@SuppressWarnings("unchecked")
Consumer<DataCounts> handler = (Consumer<DataCounts>) invocationOnMock.getArguments()[1];
handler.accept(new DataCounts(jobId));
return null;
}).when(jobProvider).dataCounts(any(), any(), any());
dataExtractorFactory = mock(DataExtractorFactory.class);
Auditor auditor = mock(Auditor.class);
threadPool = mock(ThreadPool.class);