[ML] Use XContentBuilders in try with resource statements (elastic/x-pack-elasticsearch#1329)
* [ML] Use XContentBuilders in try with resource statements * Address review comments Original commit: elastic/x-pack-elasticsearch@ef5b45e2f4
This commit is contained in:
parent
0a15b03395
commit
4e25e1a24d
|
@ -198,11 +198,11 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
|
||||||
* Index template for notifications
|
* Index template for notifications
|
||||||
*/
|
*/
|
||||||
void putNotificationMessageIndexTemplate(BiConsumer<Boolean, Exception> listener) {
|
void putNotificationMessageIndexTemplate(BiConsumer<Boolean, Exception> listener) {
|
||||||
try {
|
try (XContentBuilder auditMapping = ElasticsearchMappings.auditMessageMapping()) {
|
||||||
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(Auditor.NOTIFICATIONS_INDEX);
|
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(Auditor.NOTIFICATIONS_INDEX);
|
||||||
templateRequest.patterns(Collections.singletonList(Auditor.NOTIFICATIONS_INDEX));
|
templateRequest.patterns(Collections.singletonList(Auditor.NOTIFICATIONS_INDEX));
|
||||||
templateRequest.settings(mlNotificationIndexSettings());
|
templateRequest.settings(mlNotificationIndexSettings());
|
||||||
templateRequest.mapping(AuditMessage.TYPE.getPreferredName(), ElasticsearchMappings.auditMessageMapping());
|
templateRequest.mapping(AuditMessage.TYPE.getPreferredName(), auditMapping);
|
||||||
templateRequest.version(Version.CURRENT.id);
|
templateRequest.version(Version.CURRENT.id);
|
||||||
client.admin().indices().putTemplate(templateRequest,
|
client.admin().indices().putTemplate(templateRequest,
|
||||||
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
|
ActionListener.wrap(r -> listener.accept(true, null), e -> listener.accept(false, e)));
|
||||||
|
@ -227,10 +227,9 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
|
||||||
}
|
}
|
||||||
|
|
||||||
void putJobStateIndexTemplate(BiConsumer<Boolean, Exception> listener) {
|
void putJobStateIndexTemplate(BiConsumer<Boolean, Exception> listener) {
|
||||||
try {
|
try (XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping();
|
||||||
XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping();
|
|
||||||
XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping();
|
XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping();
|
||||||
XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping();
|
XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping()) {
|
||||||
|
|
||||||
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobStateIndexName());
|
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobStateIndexName());
|
||||||
templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()));
|
templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()));
|
||||||
|
@ -250,11 +249,10 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
|
||||||
}
|
}
|
||||||
|
|
||||||
void putJobResultsIndexTemplate(BiConsumer<Boolean, Exception> listener) {
|
void putJobResultsIndexTemplate(BiConsumer<Boolean, Exception> listener) {
|
||||||
try {
|
try (XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping();
|
||||||
XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping();
|
XContentBuilder categoryDefinitionMapping = ElasticsearchMappings.categoryDefinitionMapping();
|
||||||
XContentBuilder categoryDefinitionMapping = ElasticsearchMappings.categoryDefinitionMapping();
|
XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping();
|
||||||
XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping();
|
XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping()) {
|
||||||
XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping();
|
|
||||||
|
|
||||||
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobResultsIndexPrefix());
|
PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest(AnomalyDetectorsIndex.jobResultsIndexPrefix());
|
||||||
templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"));
|
templateRequest.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"));
|
||||||
|
|
|
@ -46,8 +46,7 @@ public class JobDataCountsPersister extends AbstractComponent {
|
||||||
* @param listener Action response listener
|
* @param listener Action response listener
|
||||||
*/
|
*/
|
||||||
public void persistDataCounts(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
|
public void persistDataCounts(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
|
||||||
try {
|
try (XContentBuilder content = serialiseCounts(counts)) {
|
||||||
XContentBuilder content = serialiseCounts(counts);
|
|
||||||
client.prepareIndex(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), DataCounts.TYPE.getPreferredName(),
|
client.prepareIndex(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), DataCounts.TYPE.getPreferredName(),
|
||||||
DataCounts.documentId(jobId))
|
DataCounts.documentId(jobId))
|
||||||
.setSource(content).execute(new ActionListener<IndexResponse>() {
|
.setSource(content).execute(new ActionListener<IndexResponse>() {
|
||||||
|
@ -61,7 +60,6 @@ public class JobDataCountsPersister extends AbstractComponent {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
logger.warn((Supplier<?>)() -> new ParameterizedMessage("[{}] Error serialising DataCounts stats", jobId), ioe);
|
logger.warn((Supplier<?>)() -> new ParameterizedMessage("[{}] Error serialising DataCounts stats", jobId), ioe);
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,8 +65,7 @@ public class JobRenormalizedResultsPersister extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateResult(String id, String index, ToXContent resultDoc) {
|
public void updateResult(String id, String index, ToXContent resultDoc) {
|
||||||
try {
|
try (XContentBuilder content = toXContentBuilder(resultDoc)) {
|
||||||
XContentBuilder content = toXContentBuilder(resultDoc);
|
|
||||||
bulkRequest.add(new IndexRequest(index, Result.TYPE.getPreferredName(), id).source(content));
|
bulkRequest.add(new IndexRequest(index, Result.TYPE.getPreferredName(), id).source(content));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error("Error serialising result", e);
|
logger.error("Error serialising result", e);
|
||||||
|
|
|
@ -6,7 +6,6 @@
|
||||||
package org.elasticsearch.xpack.ml.job.persistence;
|
package org.elasticsearch.xpack.ml.job.persistence;
|
||||||
|
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.elasticsearch.action.ActionListener;
|
|
||||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
@ -93,8 +92,7 @@ public class JobResultsPersister extends AbstractComponent {
|
||||||
bucketWithoutRecords = new Bucket(bucket);
|
bucketWithoutRecords = new Bucket(bucket);
|
||||||
bucketWithoutRecords.setRecords(Collections.emptyList());
|
bucketWithoutRecords.setRecords(Collections.emptyList());
|
||||||
}
|
}
|
||||||
try {
|
try (XContentBuilder content = toXContentBuilder(bucketWithoutRecords)) {
|
||||||
XContentBuilder content = toXContentBuilder(bucketWithoutRecords);
|
|
||||||
logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}",
|
logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}",
|
||||||
jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucketWithoutRecords.getEpoch());
|
jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucketWithoutRecords.getEpoch());
|
||||||
|
|
||||||
|
@ -113,12 +111,13 @@ public class JobResultsPersister extends AbstractComponent {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
|
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
|
||||||
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
|
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
|
||||||
XContentBuilder content = serialiseBucketInfluencerStandalone(bucketInfluencer);
|
try (XContentBuilder content = toXContentBuilder(bucketInfluencer)) {
|
||||||
// Need consistent IDs to ensure overwriting on renormalization
|
// Need consistent IDs to ensure overwriting on renormalization
|
||||||
String id = bucketInfluencer.getId();
|
String id = bucketInfluencer.getId();
|
||||||
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
|
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
|
||||||
jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id);
|
jobId, BucketInfluencer.RESULT_TYPE_VALUE, indexName, id);
|
||||||
bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(), id).source(content));
|
bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(), id).source(content));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -133,10 +132,11 @@ public class JobResultsPersister extends AbstractComponent {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (AnomalyRecord record : records) {
|
for (AnomalyRecord record : records) {
|
||||||
XContentBuilder content = toXContentBuilder(record);
|
try (XContentBuilder content = toXContentBuilder(record)) {
|
||||||
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
|
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
|
||||||
jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId());
|
jobId, AnomalyRecord.RESULT_TYPE_VALUE, indexName, record.getId());
|
||||||
bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(), record.getId()).source(content));
|
bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(), record.getId()).source(content));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}), e);
|
logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}), e);
|
||||||
|
@ -155,10 +155,11 @@ public class JobResultsPersister extends AbstractComponent {
|
||||||
public Builder persistInfluencers(List<Influencer> influencers) {
|
public Builder persistInfluencers(List<Influencer> influencers) {
|
||||||
try {
|
try {
|
||||||
for (Influencer influencer : influencers) {
|
for (Influencer influencer : influencers) {
|
||||||
XContentBuilder content = toXContentBuilder(influencer);
|
try (XContentBuilder content = toXContentBuilder(influencer)) {
|
||||||
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
|
logger.trace("[{}] ES BULK ACTION: index result type {} to index {} with ID {}",
|
||||||
jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId());
|
jobId, Influencer.RESULT_TYPE_VALUE, indexName, influencer.getId());
|
||||||
bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(), influencer.getId()).source(content));
|
bulkRequest.add(new IndexRequest(indexName, Result.TYPE.getPreferredName(), influencer.getId()).source(content));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}), e);
|
logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}), e);
|
||||||
|
@ -174,13 +175,13 @@ public class JobResultsPersister extends AbstractComponent {
|
||||||
* @return this
|
* @return this
|
||||||
*/
|
*/
|
||||||
public Builder persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities) {
|
public Builder persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities) {
|
||||||
try {
|
try (XContentBuilder builder = toXContentBuilder(partitionProbabilities)) {
|
||||||
XContentBuilder builder = toXContentBuilder(partitionProbabilities);
|
|
||||||
logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {} with ID {}",
|
logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {} with ID {}",
|
||||||
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(),
|
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(),
|
||||||
partitionProbabilities.getId());
|
partitionProbabilities.getId());
|
||||||
bulkRequest.add(
|
bulkRequest.add(
|
||||||
new IndexRequest(indexName, Result.TYPE.getPreferredName(), partitionProbabilities.getId()).source(builder));
|
new IndexRequest(indexName, Result.TYPE.getPreferredName(), partitionProbabilities.getId()).source(builder));
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores",
|
logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores",
|
||||||
new Object[]{jobId}), e);
|
new Object[]{jobId}), e);
|
||||||
|
@ -313,12 +314,6 @@ public class JobResultsPersister extends AbstractComponent {
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|
||||||
private XContentBuilder serialiseBucketInfluencerStandalone(BucketInfluencer bucketInfluencer) throws IOException {
|
|
||||||
XContentBuilder builder = jsonBuilder();
|
|
||||||
bucketInfluencer.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
|
||||||
return builder;
|
|
||||||
}
|
|
||||||
|
|
||||||
private class Persistable {
|
private class Persistable {
|
||||||
|
|
||||||
private final String jobId;
|
private final String jobId;
|
||||||
|
@ -341,9 +336,8 @@ public class JobResultsPersister extends AbstractComponent {
|
||||||
|
|
||||||
logCall(indexName);
|
logCall(indexName);
|
||||||
|
|
||||||
try {
|
try (XContentBuilder content = toXContentBuilder(object)) {
|
||||||
IndexRequest indexRequest = new IndexRequest(indexName, type, id)
|
IndexRequest indexRequest = new IndexRequest(indexName, type, id).source(content);
|
||||||
.source(toXContentBuilder(object));
|
|
||||||
client.index(indexRequest).actionGet();
|
client.index(indexRequest).actionGet();
|
||||||
return true;
|
return true;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
|
@ -17,6 +17,7 @@ import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.xcontent.ToXContent;
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||||
import org.elasticsearch.xpack.ml.job.config.Detector;
|
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||||
|
@ -112,7 +113,9 @@ public class FieldConfigWriter {
|
||||||
} else {
|
} else {
|
||||||
contents.append(',');
|
contents.append(',');
|
||||||
}
|
}
|
||||||
contents.append(rule.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).string());
|
try (XContentBuilder contentBuilder = XContentFactory.jsonBuilder()) {
|
||||||
|
contents.append(rule.toXContent(contentBuilder, ToXContent.EMPTY_PARAMS).string());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
contents.append(']');
|
contents.append(']');
|
||||||
|
|
||||||
|
|
|
@ -65,8 +65,8 @@ public class Auditor {
|
||||||
}
|
}
|
||||||
|
|
||||||
private XContentBuilder toXContentBuilder(ToXContent toXContent) {
|
private XContentBuilder toXContentBuilder(ToXContent toXContent) {
|
||||||
try {
|
try (XContentBuilder jsonBuilder = jsonBuilder()) {
|
||||||
return toXContent.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS);
|
return toXContent.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue