Original commit: elastic/x-pack-elasticsearch@d300be2dde
This commit is contained in:
David Kyle 2017-01-18 13:35:25 +00:00 committed by GitHub
parent b2917376f0
commit 4c0d2a492d
1 changed files with 50 additions and 70 deletions

View File

@ -85,7 +85,9 @@ import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class JobProvider {
private static final Logger LOGGER = Loggers.getLogger(JobProvider.class);
@ -285,18 +287,24 @@ public class JobProvider {
* @param jobId The job id
*/
public void dataCounts(String jobId, Consumer<DataCounts> handler, Consumer<Exception> errorHandler) {
get(jobId, DataCounts.TYPE.getPreferredName(), jobId + DataCounts.DOCUMENT_SUFFIX, handler, errorHandler,
DataCounts.PARSER, () -> new DataCounts(jobId));
}
private <T, U> void get(String jobId, String type, String id, Consumer<T> handler, Consumer<Exception> errorHandler,
BiFunction<XContentParser, U, T> objectParser, Supplier<T> notFoundSupplier) {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
GetRequest getRequest = new GetRequest(indexName, DataCounts.TYPE.getPreferredName(), jobId + DataCounts.DOCUMENT_SUFFIX);
GetRequest getRequest = new GetRequest(indexName, type, id);
client.get(getRequest, ActionListener.wrap(
response -> {
if (response.isExists() == false) {
handler.accept(new DataCounts(jobId));
handler.accept(notFoundSupplier.get());
} else {
BytesReference source = response.getSourceAsBytesRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
handler.accept(DataCounts.PARSER.apply(parser, null));
handler.accept(objectParser.apply(parser, null));
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse bucket", e);
throw new ElasticsearchParseException("failed to parse " + type, e);
}
}
},
@ -309,6 +317,25 @@ public class JobProvider {
}));
}
private <T, U> Optional<T> getBlocking(String indexName, String type, String id, BiFunction<XContentParser, U, T> objectParser) {
GetRequest getRequest = new GetRequest(indexName, type, id);
try {
GetResponse response = client.get(getRequest).actionGet();
if (!response.isExists()) {
return Optional.empty();
}
BytesReference source = response.getSourceAsBytesRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
return Optional.of(objectParser.apply(parser, null));
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse " + type, e);
}
} catch (IndexNotFoundException e) {
LOGGER.error("Missing index when getting " + type, e);
throw e;
}
}
/**
* Search for buckets with the parameters in the {@link BucketsQueryBuilder}
*/
@ -659,9 +686,9 @@ public class JobProvider {
* The returned records have their id set.
*/
private void records(String jobId, int from, int size,
QueryBuilder recordFilter, FieldSortBuilder sb, List<String> secondarySort,
boolean descending, Consumer<QueryPage<AnomalyRecord>> handler,
Consumer<Exception> errorHandler) {
QueryBuilder recordFilter, FieldSortBuilder sb, List<String> secondarySort,
boolean descending, Consumer<QueryPage<AnomalyRecord>> handler,
Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
recordFilter = new BoolQueryBuilder()
@ -771,32 +798,15 @@ public class JobProvider {
*/
public Optional<Quantiles> getQuantiles(String jobId) {
String indexName = AnomalyDetectorsIndex.jobStateIndexName();
try {
String quantilesId = Quantiles.quantilesId(jobId);
String quantilesId = Quantiles.quantilesId(jobId);
LOGGER.trace("ES API CALL: get ID {} type {} from index {}", quantilesId, Quantiles.TYPE.getPreferredName(), indexName);
LOGGER.trace("ES API CALL: get ID {} type {} from index {}", quantilesId, Quantiles.TYPE.getPreferredName(), indexName);
GetRequest getRequest = new GetRequest(indexName, Quantiles.TYPE.getPreferredName(), quantilesId);
// can be blocking as it is called from a thread from generic pool:
GetResponse response = client.get(getRequest).actionGet();
if (!response.isExists()) {
LOGGER.info("There are currently no quantiles for job " + jobId);
return Optional.empty();
}
BytesReference source = response.getSourceAsBytesRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
Quantiles quantiles = Quantiles.PARSER.apply(parser, null);
if (quantiles.getQuantileState() == null) {
LOGGER.error("Inconsistency - no " + Quantiles.QUANTILE_STATE
+ " field in quantiles for job " + jobId);
}
return Optional.of(quantiles);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse quantiles", e);
}
} catch (IndexNotFoundException e) {
LOGGER.error("Missing index when getting quantiles", e);
throw e;
Optional<Quantiles> quantiles = getBlocking(indexName, Quantiles.TYPE.getPreferredName(), quantilesId, Quantiles.PARSER);
if (quantiles.isPresent() && quantiles.get().getQuantileState() == null) {
LOGGER.error("Inconsistency - no " + Quantiles.QUANTILE_STATE + " field in quantiles for job " + jobId);
}
return quantiles;
}
/**
@ -1002,33 +1012,15 @@ public class JobProvider {
* Get the job's model size stats.
*/
public void modelSizeStats(String jobId, Consumer<ModelSizeStats> handler, Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
LOGGER.trace("ES API CALL: get result type {} ID {} from index {}",
ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, indexName);
LOGGER.trace("ES API CALL: get result type {} ID {} for job {}",
ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, jobId);
GetRequest getRequest =
new GetRequest(indexName, Result.TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName());
client.get(getRequest, ActionListener.wrap(response -> {
if (response.isExists()) {
BytesReference source = response.getSourceAsBytesRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
ModelSizeStats modelSizeStats = ModelSizeStats.PARSER.apply(parser, null).build();
handler.accept(modelSizeStats);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse model size stats", e);
}
} else {
String msg = "No memory usage details for job with id " + jobId;
LOGGER.warn(msg);
handler.accept(null);
}
}, e -> {
if (e instanceof IndexNotFoundException) {
handler.accept(null);
} else {
errorHandler.accept(e);
}
}));
get(jobId, Result.TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName(),
handler, errorHandler, (parser, context) -> ModelSizeStats.PARSER.apply(parser, null).build(),
() -> {
LOGGER.warn("No memory usage details for job with id {}", jobId);
return null;
});
}
/**
@ -1038,19 +1030,7 @@ public class JobProvider {
* @return the matching list if it exists
*/
public Optional<ListDocument> getList(String listId) {
GetRequest getRequest = new GetRequest(ML_INFO_INDEX, ListDocument.TYPE.getPreferredName(), listId);
// can be blocking as it is called from a thread from generic pool:
GetResponse response = client.get(getRequest).actionGet();
if (!response.isExists()) {
return Optional.empty();
}
BytesReference source = response.getSourceAsBytesRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
ListDocument listDocument = ListDocument.PARSER.apply(parser, null);
return Optional.of(listDocument);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse list", e);
}
return getBlocking(ML_INFO_INDEX, ListDocument.TYPE.getPreferredName(), listId, ListDocument.PARSER);
}
/**