From d7f6de7133230bba67f5567f6a0e243a5b8aaee0 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 6 Jan 2017 15:08:10 +0100 Subject: [PATCH] Made client calls non blocking in JobProvider#modelSizeStats(...) and FixBlockingClientOperations in two places where blocking client calls are ok, because these methods aren't called from a network thread. Original commit: elastic/x-pack-elasticsearch@a6dc34651c24ea7bb2db068c1908d8c1a75fb1b5 --- .../prelert/action/GetJobsStatsAction.java | 21 ++++- .../prelert/job/persistence/JobProvider.java | 88 +++++++++---------- .../AutodetectResultProcessorIT.java | 23 ++++- 3 files changed, 77 insertions(+), 55 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobsStatsAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobsStatsAction.java index 78417022c58..f17cd4adf98 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobsStatsAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/action/GetJobsStatsAction.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -308,8 +309,7 @@ public class GetJobsStatsAction extends Action { - ModelSizeStats modelSizeStats = readModelSizeStats(job.getId()); + gatherDataCountsAndModelSizeStats(job.getId(), (dataCounts, modelSizeStats) -> { JobStatus status = prelertMetadata.getAllocations().get(job.getId()).getStatus(); jobsStats.setOnce(slot, new Response.JobStats(job.getId(), dataCounts, modelSizeStats, status)); @@ -323,6 +323,15 @@ public class GetJobsStatsAction extends Action handler, + Consumer errorHandler) { + readDataCounts(jobId, dataCounts -> { + readModelSizeStats(jobId, modelSizeStats -> { + handler.accept(dataCounts, modelSizeStats); + }, errorHandler); + }, errorHandler); + } + private void readDataCounts(String jobId, Consumer handler, Consumer errorHandler) { Optional counts = processManager.getDataCounts(jobId); if (counts.isPresent()) { @@ -332,9 +341,13 @@ public class GetJobsStatsAction extends Action handler, Consumer errorHandler) { Optional sizeStats = processManager.getModelSizeStats(jobId); - return sizeStats.orElseGet(() -> jobProvider.modelSizeStats(jobId).orElse(null)); + if (sizeStats.isPresent()) { + handler.accept(sizeStats.get()); + } else { + jobProvider.modelSizeStats(jobId, handler, errorHandler); + } } } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java index 3cbfd2ef8ed..788cdd984a7 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; -import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.MultiSearchRequest; @@ -853,12 +852,23 @@ public class JobProvider { LOGGER.trace("ES API CALL: get ID {} type {} from index {}", quantilesId, Quantiles.TYPE.getPreferredName(), indexName); GetRequest getRequest = new GetRequest(indexName, Quantiles.TYPE.getPreferredName(), quantilesId); - GetResponse response = FixBlockingClientOperations.executeBlocking(client, GetAction.INSTANCE, getRequest); + // can be blocking as it is called from a thread from generic pool: + GetResponse response = client.get(getRequest).actionGet(); if (!response.isExists()) { LOGGER.info("There are currently no quantiles for job " + jobId); return Optional.empty(); } - return Optional.of(createQuantiles(jobId, response)); + BytesReference source = response.getSourceAsBytesRef(); + try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) { + Quantiles quantiles = Quantiles.PARSER.apply(parser, () -> parseFieldMatcher); + if (quantiles.getQuantileState() == null) { + LOGGER.error("Inconsistency - no " + Quantiles.QUANTILE_STATE + + " field in quantiles for job " + jobId); + } + return Optional.of(quantiles); + } catch (IOException e) { + throw new ElasticsearchParseException("failed to parse quantiles", e); + } } catch (IndexNotFoundException e) { LOGGER.error("Missing index when getting quantiles", e); throw e; @@ -1026,22 +1036,6 @@ public class JobProvider { stream.write(0); } - private Quantiles createQuantiles(String jobId, GetResponse response) { - BytesReference source = response.getSourceAsBytesRef(); - XContentParser parser; - try { - parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source); - } catch (IOException e) { - throw new ElasticsearchParseException("failed to parse quantiles", e); - } - Quantiles quantiles = Quantiles.PARSER.apply(parser, () -> parseFieldMatcher); - if (quantiles.getQuantileState() == null) { - LOGGER.error("Inconsistency - no " + Quantiles.QUANTILE_STATE - + " field in quantiles for job " + jobId); - } - return quantiles; - } - public QueryPage modelDebugOutput(String jobId, int from, int size) { SearchResponse searchResponse; try { @@ -1076,35 +1070,34 @@ public class JobProvider { /** * Get the job's model size stats. */ - public Optional modelSizeStats(String jobId) { + public void modelSizeStats(String jobId, Consumer handler, Consumer errorHandler) { String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); - try { - LOGGER.trace("ES API CALL: get result type {} ID {} from index {}", - ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, indexName); + LOGGER.trace("ES API CALL: get result type {} ID {} from index {}", + ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, indexName); - GetRequest getRequest = - new GetRequest(indexName, Result.TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName()); - GetResponse modelSizeStatsResponse = FixBlockingClientOperations.executeBlocking(client, GetAction.INSTANCE, getRequest); - - if (!modelSizeStatsResponse.isExists()) { - String msg = "No memory usage details for job with id " + jobId; - LOGGER.warn(msg); - return Optional.empty(); - } else { - BytesReference source = modelSizeStatsResponse.getSourceAsBytesRef(); - XContentParser parser; - try { - parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source); + GetRequest getRequest = + new GetRequest(indexName, Result.TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName()); + client.get(getRequest, ActionListener.wrap(response -> { + if (response.isExists()) { + BytesReference source = response.getSourceAsBytesRef(); + try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) { + ModelSizeStats modelSizeStats = ModelSizeStats.PARSER.apply(parser, () -> parseFieldMatcher).build(); + handler.accept(modelSizeStats); } catch (IOException e) { throw new ElasticsearchParseException("failed to parse model size stats", e); } - ModelSizeStats modelSizeStats = ModelSizeStats.PARSER.apply(parser, () -> parseFieldMatcher).build(); - return Optional.of(modelSizeStats); + } else { + String msg = "No memory usage details for job with id " + jobId; + LOGGER.warn(msg); + handler.accept(null); } - } catch (IndexNotFoundException e) { - LOGGER.warn("Missing index " + indexName, e); - return Optional.empty(); - } + }, e -> { + if (e instanceof IndexNotFoundException) { + handler.accept(null); + } else { + errorHandler.accept(e); + } + })); } /** @@ -1115,19 +1108,18 @@ public class JobProvider { */ public Optional getList(String listId) { GetRequest getRequest = new GetRequest(PRELERT_INFO_INDEX, ListDocument.TYPE.getPreferredName(), listId); - GetResponse response = FixBlockingClientOperations.executeBlocking(client, GetAction.INSTANCE, getRequest); + // can be blocking as it is called from a thread from generic pool: + GetResponse response = client.get(getRequest).actionGet(); if (!response.isExists()) { return Optional.empty(); } BytesReference source = response.getSourceAsBytesRef(); - XContentParser parser; - try { - parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source); + try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) { + ListDocument listDocument = ListDocument.PARSER.apply(parser, () -> parseFieldMatcher); + return Optional.of(listDocument); } catch (IOException e) { throw new ElasticsearchParseException("failed to parse list", e); } - ListDocument listDocument = ListDocument.PARSER.apply(parser, () -> parseFieldMatcher); - return Optional.of(listDocument); } /** diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java index cfe056c1097..cdd13be7219 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java @@ -138,9 +138,8 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { assertEquals(1, persistedModelDebugOutput.count()); assertEquals(modelDebugOutput, persistedModelDebugOutput.results().get(0)); - Optional persistedModelSizeStats = jobProvider.modelSizeStats(JOB_ID); - assertTrue(persistedModelSizeStats.isPresent()); - assertEquals(modelSizeStats, persistedModelSizeStats.get()); + ModelSizeStats persistedModelSizeStats = getModelSizeStats(); + assertEquals(modelSizeStats, persistedModelSizeStats); QueryPage persistedModelSnapshot = jobProvider.modelSnapshots(JOB_ID, 0, 100); assertEquals(1, persistedModelSnapshot.count()); @@ -485,4 +484,22 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { } return resultHolder.get(); } + + private ModelSizeStats getModelSizeStats() throws Exception { + AtomicReference errorHolder = new AtomicReference<>(); + AtomicReference resultHolder = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + jobProvider.modelSizeStats(JOB_ID, modelSizeStats -> { + resultHolder.set(modelSizeStats); + latch.countDown(); + }, e -> { + errorHolder.set(e); + latch.countDown(); + }); + latch.await(); + if (errorHolder.get() != null) { + throw errorHolder.get(); + } + return resultHolder.get(); + } }