Made client calls non blocking in JobProvider#modelSizeStats(...)

and FixBlockingClientOperations in two places where blocking client calls are ok,
because these methods aren't called from a network thread.

Original commit: elastic/x-pack-elasticsearch@a6dc34651c
This commit is contained in:
Martijn van Groningen 2017-01-06 15:08:10 +01:00
parent ae8695a22d
commit d7f6de7133
3 changed files with 77 additions and 55 deletions

View File

@ -46,6 +46,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -308,8 +309,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
for (int i = 0; i < jobs.results().size(); i++) {
int slot = i;
Job job = jobs.results().get(slot);
readDataCounts(job.getId(), dataCounts -> {
ModelSizeStats modelSizeStats = readModelSizeStats(job.getId());
gatherDataCountsAndModelSizeStats(job.getId(), (dataCounts, modelSizeStats) -> {
JobStatus status = prelertMetadata.getAllocations().get(job.getId()).getStatus();
jobsStats.setOnce(slot, new Response.JobStats(job.getId(), dataCounts, modelSizeStats, status));
@ -323,6 +323,15 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
}
}
private void gatherDataCountsAndModelSizeStats(String jobId, BiConsumer<DataCounts, ModelSizeStats> handler,
Consumer<Exception> errorHandler) {
readDataCounts(jobId, dataCounts -> {
readModelSizeStats(jobId, modelSizeStats -> {
handler.accept(dataCounts, modelSizeStats);
}, errorHandler);
}, errorHandler);
}
private void readDataCounts(String jobId, Consumer<DataCounts> handler, Consumer<Exception> errorHandler) {
Optional<DataCounts> counts = processManager.getDataCounts(jobId);
if (counts.isPresent()) {
@ -332,9 +341,13 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
}
}
private ModelSizeStats readModelSizeStats(String jobId) {
private void readModelSizeStats(String jobId, Consumer<ModelSizeStats> handler, Consumer<Exception> errorHandler) {
Optional<ModelSizeStats> sizeStats = processManager.getModelSizeStats(jobId);
return sizeStats.orElseGet(() -> jobProvider.modelSizeStats(jobId).orElse(null));
if (sizeStats.isPresent()) {
handler.accept(sizeStats.get());
} else {
jobProvider.modelSizeStats(jobId, handler, errorHandler);
}
}
}
}

View File

@ -16,7 +16,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
@ -853,12 +852,23 @@ public class JobProvider {
LOGGER.trace("ES API CALL: get ID {} type {} from index {}", quantilesId, Quantiles.TYPE.getPreferredName(), indexName);
GetRequest getRequest = new GetRequest(indexName, Quantiles.TYPE.getPreferredName(), quantilesId);
GetResponse response = FixBlockingClientOperations.executeBlocking(client, GetAction.INSTANCE, getRequest);
// can be blocking as it is called from a thread from generic pool:
GetResponse response = client.get(getRequest).actionGet();
if (!response.isExists()) {
LOGGER.info("There are currently no quantiles for job " + jobId);
return Optional.empty();
}
return Optional.of(createQuantiles(jobId, response));
BytesReference source = response.getSourceAsBytesRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
Quantiles quantiles = Quantiles.PARSER.apply(parser, () -> parseFieldMatcher);
if (quantiles.getQuantileState() == null) {
LOGGER.error("Inconsistency - no " + Quantiles.QUANTILE_STATE
+ " field in quantiles for job " + jobId);
}
return Optional.of(quantiles);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse quantiles", e);
}
} catch (IndexNotFoundException e) {
LOGGER.error("Missing index when getting quantiles", e);
throw e;
@ -1026,22 +1036,6 @@ public class JobProvider {
stream.write(0);
}
private Quantiles createQuantiles(String jobId, GetResponse response) {
BytesReference source = response.getSourceAsBytesRef();
XContentParser parser;
try {
parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse quantiles", e);
}
Quantiles quantiles = Quantiles.PARSER.apply(parser, () -> parseFieldMatcher);
if (quantiles.getQuantileState() == null) {
LOGGER.error("Inconsistency - no " + Quantiles.QUANTILE_STATE
+ " field in quantiles for job " + jobId);
}
return quantiles;
}
public QueryPage<ModelDebugOutput> modelDebugOutput(String jobId, int from, int size) {
SearchResponse searchResponse;
try {
@ -1076,35 +1070,34 @@ public class JobProvider {
/**
* Get the job's model size stats.
*/
public Optional<ModelSizeStats> modelSizeStats(String jobId) {
public void modelSizeStats(String jobId, Consumer<ModelSizeStats> handler, Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
try {
LOGGER.trace("ES API CALL: get result type {} ID {} from index {}",
ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, indexName);
GetRequest getRequest =
new GetRequest(indexName, Result.TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName());
GetResponse modelSizeStatsResponse = FixBlockingClientOperations.executeBlocking(client, GetAction.INSTANCE, getRequest);
if (!modelSizeStatsResponse.isExists()) {
String msg = "No memory usage details for job with id " + jobId;
LOGGER.warn(msg);
return Optional.empty();
} else {
BytesReference source = modelSizeStatsResponse.getSourceAsBytesRef();
XContentParser parser;
try {
parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source);
client.get(getRequest, ActionListener.wrap(response -> {
if (response.isExists()) {
BytesReference source = response.getSourceAsBytesRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
ModelSizeStats modelSizeStats = ModelSizeStats.PARSER.apply(parser, () -> parseFieldMatcher).build();
handler.accept(modelSizeStats);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse model size stats", e);
}
ModelSizeStats modelSizeStats = ModelSizeStats.PARSER.apply(parser, () -> parseFieldMatcher).build();
return Optional.of(modelSizeStats);
} else {
String msg = "No memory usage details for job with id " + jobId;
LOGGER.warn(msg);
handler.accept(null);
}
} catch (IndexNotFoundException e) {
LOGGER.warn("Missing index " + indexName, e);
return Optional.empty();
}, e -> {
if (e instanceof IndexNotFoundException) {
handler.accept(null);
} else {
errorHandler.accept(e);
}
}));
}
/**
@ -1115,19 +1108,18 @@ public class JobProvider {
*/
public Optional<ListDocument> getList(String listId) {
GetRequest getRequest = new GetRequest(PRELERT_INFO_INDEX, ListDocument.TYPE.getPreferredName(), listId);
GetResponse response = FixBlockingClientOperations.executeBlocking(client, GetAction.INSTANCE, getRequest);
// can be blocking as it is called from a thread from generic pool:
GetResponse response = client.get(getRequest).actionGet();
if (!response.isExists()) {
return Optional.empty();
}
BytesReference source = response.getSourceAsBytesRef();
XContentParser parser;
try {
parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source);
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
ListDocument listDocument = ListDocument.PARSER.apply(parser, () -> parseFieldMatcher);
return Optional.of(listDocument);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse list", e);
}
ListDocument listDocument = ListDocument.PARSER.apply(parser, () -> parseFieldMatcher);
return Optional.of(listDocument);
}
/**

View File

@ -138,9 +138,8 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
assertEquals(1, persistedModelDebugOutput.count());
assertEquals(modelDebugOutput, persistedModelDebugOutput.results().get(0));
Optional<ModelSizeStats> persistedModelSizeStats = jobProvider.modelSizeStats(JOB_ID);
assertTrue(persistedModelSizeStats.isPresent());
assertEquals(modelSizeStats, persistedModelSizeStats.get());
ModelSizeStats persistedModelSizeStats = getModelSizeStats();
assertEquals(modelSizeStats, persistedModelSizeStats);
QueryPage<ModelSnapshot> persistedModelSnapshot = jobProvider.modelSnapshots(JOB_ID, 0, 100);
assertEquals(1, persistedModelSnapshot.count());
@ -485,4 +484,22 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
}
return resultHolder.get();
}
private ModelSizeStats getModelSizeStats() throws Exception {
AtomicReference<Exception> errorHolder = new AtomicReference<>();
AtomicReference<ModelSizeStats> resultHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
jobProvider.modelSizeStats(JOB_ID, modelSizeStats -> {
resultHolder.set(modelSizeStats);
latch.countDown();
}, e -> {
errorHolder.set(e);
latch.countDown();
});
latch.await();
if (errorHolder.get() != null) {
throw errorHolder.get();
}
return resultHolder.get();
}
}