From 2147d217df8039d10e53317ff36d5e6368a67a09 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Sun, 4 Mar 2018 16:48:15 -0700 Subject: [PATCH] Wrap stream passed to createParser in try-with-resources (elastic/x-pack-elasticsearch#4055) This wraps the stream (`.streamInput()`) that is passed to many of the `createParser` instances in the enclosing (or a new) try-with-resources block. This ensures the `BytesReference.streamInput()` is closed. Relates to elastic/x-pack-elasticsearch#28504 Original commit: elastic/x-pack-elasticsearch@7546e3b4d470510892576df24cdf3bb010714ac0 --- .../org/elasticsearch/license/License.java | 66 ++++++++++--------- .../core/ml/action/PreviewDatafeedAction.java | 5 +- .../autodetect/state/ModelSnapshot.java | 6 +- .../user/ChangePasswordRequestBuilder.java | 6 +- .../action/user/PutUserRequestBuilder.java | 6 +- .../support/mapper/ExpressionRoleMapping.java | 6 +- .../expressiondsl/ExpressionParser.java | 5 +- .../core/security/authz/RoleDescriptor.java | 11 ++-- .../support/xcontent/XContentSource.java | 11 ++-- .../ml/action/TransportGetFiltersAction.java | 23 ++++--- .../persistence/BatchedBucketsIterator.java | 12 ++-- .../BatchedInfluencersIterator.java | 13 ++-- .../persistence/BatchedRecordsIterator.java | 12 ++-- .../xpack/ml/job/persistence/JobProvider.java | 41 +++++++----- .../process/logging/CppLogMessageHandler.java | 6 +- .../output/NormalizerResultHandler.java | 10 +-- .../retention/ExpiredForecastsRemover.java | 13 ++-- .../exporter/FilteredMonitoringDoc.java | 4 +- .../xpack/rollup/RollupRequestTranslator.java | 6 +- .../mapper/NativeRoleMappingStore.java | 10 ++- .../xpack/sql/client/HttpClient.java | 5 +- .../input/http/ExecutableHttpInput.java | 6 +- .../attachment/ReportingAttachmentParser.java | 6 +- .../notification/jira/JiraAccount.java | 10 ++- .../watcher/notification/jira/JiraIssue.java | 6 +- .../notification/pagerduty/SentEvent.java | 9 +-- .../search/WatcherSearchTemplateService.java | 6 +- .../xpack/watcher/watch/WatchParser.java | 8 ++- 28 files changed, 193 insertions(+), 135 deletions(-) diff --git a/plugin/core/src/main/java/org/elasticsearch/license/License.java b/plugin/core/src/main/java/org/elasticsearch/license/License.java index 7a85dfa2270..1e0187231d5 100644 --- a/plugin/core/src/main/java/org/elasticsearch/license/License.java +++ b/plugin/core/src/main/java/org/elasticsearch/license/License.java @@ -6,6 +6,7 @@ package org.elasticsearch.license; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Base64; @@ -497,44 +498,47 @@ public class License implements ToXContentObject { throw new ElasticsearchParseException("failed to parse license - no content-type provided"); } // EMPTY is safe here because we don't call namedObject - final XContentParser parser = xContentType.xContent() - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, bytes.streamInput()); - License license = null; - if (parser.nextToken() == XContentParser.Token.START_OBJECT) { - if (parser.nextToken() == XContentParser.Token.FIELD_NAME) { - String currentFieldName = parser.currentName(); - if (Fields.LICENSES.equals(currentFieldName)) { - final List pre20Licenses = new ArrayList<>(); - if (parser.nextToken() == XContentParser.Token.START_ARRAY) { - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - pre20Licenses.add(License.fromXContent(parser)); - } - // take the latest issued unexpired license - CollectionUtil.timSort(pre20Licenses, LATEST_ISSUE_DATE_FIRST); - long now = System.currentTimeMillis(); - for (License oldLicense : pre20Licenses) { - if (oldLicense.expiryDate() > now) { - license = oldLicense; - break; + try (InputStream byteStream = bytes.streamInput(); + XContentParser parser = xContentType.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, byteStream)) + { + License license = null; + if (parser.nextToken() == XContentParser.Token.START_OBJECT) { + if (parser.nextToken() == XContentParser.Token.FIELD_NAME) { + String currentFieldName = parser.currentName(); + if (Fields.LICENSES.equals(currentFieldName)) { + final List pre20Licenses = new ArrayList<>(); + if (parser.nextToken() == XContentParser.Token.START_ARRAY) { + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + pre20Licenses.add(License.fromXContent(parser)); } + // take the latest issued unexpired license + CollectionUtil.timSort(pre20Licenses, LATEST_ISSUE_DATE_FIRST); + long now = System.currentTimeMillis(); + for (License oldLicense : pre20Licenses) { + if (oldLicense.expiryDate() > now) { + license = oldLicense; + break; + } + } + if (license == null && !pre20Licenses.isEmpty()) { + license = pre20Licenses.get(0); + } + } else { + throw new ElasticsearchParseException("failed to parse licenses expected an array of licenses"); } - if (license == null && !pre20Licenses.isEmpty()) { - license = pre20Licenses.get(0); - } - } else { - throw new ElasticsearchParseException("failed to parse licenses expected an array of licenses"); + } else if (Fields.LICENSE.equals(currentFieldName)) { + license = License.fromXContent(parser); } - } else if (Fields.LICENSE.equals(currentFieldName)) { - license = License.fromXContent(parser); + // Ignore all other fields - might be created with new version + } else { + throw new ElasticsearchParseException("failed to parse licenses expected field"); } - // Ignore all other fields - might be created with new version } else { - throw new ElasticsearchParseException("failed to parse licenses expected field"); + throw new ElasticsearchParseException("failed to parse licenses expected start object"); } - } else { - throw new ElasticsearchParseException("failed to parse licenses expected start object"); + return license; } - return license; } @Override diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDatafeedAction.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDatafeedAction.java index 22a8970069d..15fbe437548 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDatafeedAction.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDatafeedAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; +import java.io.InputStream; import java.util.Objects; public class PreviewDatafeedAction extends Action responseBody; if (getDocResponse.isExists()) { BytesReference docSource = getDocResponse.getSourceAsBytesRef(); - XContentParser parser = - XContentFactory.xContent(docSource) - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, - docSource.streamInput()); - MlFilter filter = MlFilter.PARSER.apply(parser, null).build(); - responseBody = new QueryPage<>(Collections.singletonList(filter), 1, MlFilter.RESULTS_FIELD); + try (InputStream stream = docSource.streamInput(); + XContentParser parser = + XContentFactory.xContent(docSource) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + MlFilter filter = MlFilter.PARSER.apply(parser, null).build(); + responseBody = new QueryPage<>(Collections.singletonList(filter), 1, MlFilter.RESULTS_FIELD); - GetFiltersAction.Response filterResponse = new GetFiltersAction.Response(responseBody); - listener.onResponse(filterResponse); + GetFiltersAction.Response filterResponse = new GetFiltersAction.Response(responseBody); + listener.onResponse(filterResponse); + } } else { this.onFailure(QueryPage.emptyQueryPage(MlFilter.RESULTS_FIELD)); } @@ -122,8 +124,9 @@ public class TransportGetFiltersAction extends HandledTransportAction docs = new ArrayList<>(); for (SearchHit hit : response.getHits().getHits()) { BytesReference docSource = hit.getSourceRef(); - try (XContentParser parser = XContentFactory.xContent(docSource).createParser( - NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, docSource.streamInput())) { + try (InputStream stream = docSource.streamInput(); + XContentParser parser = XContentFactory.xContent(docSource).createParser( + NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { docs.add(MlFilter.PARSER.apply(parser, null).build()); } catch (IOException e) { this.onFailure(e); diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedBucketsIterator.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedBucketsIterator.java index e2879017c91..441a5a31b8e 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedBucketsIterator.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedBucketsIterator.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Result; import java.io.IOException; +import java.io.InputStream; class BatchedBucketsIterator extends BatchedResultsIterator { @@ -27,14 +28,13 @@ class BatchedBucketsIterator extends BatchedResultsIterator { @Override protected Result map(SearchHit hit) { BytesReference source = hit.getSourceRef(); - XContentParser parser; - try { - parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, source.streamInput()); + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, stream)) { + Bucket bucket = Bucket.PARSER.apply(parser, null); + return new Result<>(hit.getIndex(), bucket); } catch (IOException e) { throw new ElasticsearchParseException("failed to parse bucket", e); } - Bucket bucket = Bucket.PARSER.apply(parser, null); - return new Result<>(hit.getIndex(), bucket); } } diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedInfluencersIterator.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedInfluencersIterator.java index e0e7392056b..d36fe6c9484 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedInfluencersIterator.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedInfluencersIterator.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.Result; import java.io.IOException; +import java.io.InputStream; class BatchedInfluencersIterator extends BatchedResultsIterator { BatchedInfluencersIterator(Client client, String jobId) { @@ -26,15 +27,13 @@ class BatchedInfluencersIterator extends BatchedResultsIterator { @Override protected Result map(SearchHit hit) { BytesReference source = hit.getSourceRef(); - XContentParser parser; - try { - parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, source.streamInput()); + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, stream)) { + Influencer influencer = Influencer.PARSER.apply(parser, null); + return new Result<>(hit.getIndex(), influencer); } catch (IOException e) { throw new ElasticsearchParseException("failed to parser influencer", e); } - - Influencer influencer = Influencer.PARSER.apply(parser, null); - return new Result<>(hit.getIndex(), influencer); } } diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedRecordsIterator.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedRecordsIterator.java index 3c98625c8cb..a0203025939 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedRecordsIterator.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedRecordsIterator.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.Result; import java.io.IOException; +import java.io.InputStream; class BatchedRecordsIterator extends BatchedResultsIterator { @@ -27,14 +28,13 @@ class BatchedRecordsIterator extends BatchedResultsIterator { @Override protected Result map(SearchHit hit) { BytesReference source = hit.getSourceRef(); - XContentParser parser; - try { - parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, source.streamInput()); + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, stream)){ + AnomalyRecord record = AnomalyRecord.PARSER.apply(parser, null); + return new Result<>(hit.getIndex(), record); } catch (IOException e) { throw new ElasticsearchParseException("failed to parse record", e); } - AnomalyRecord record = AnomalyRecord.PARSER.apply(parser, null); - return new Result<>(hit.getIndex(), record); } } diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 89539309642..4e476dd645a 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -101,6 +101,7 @@ import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils; import org.elasticsearch.xpack.core.security.support.Exceptions; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -474,8 +475,9 @@ public class JobProvider { private T parseSearchHit(SearchHit hit, BiFunction objectParser, Consumer errorHandler) { BytesReference source = hit.getSourceRef(); - try (XContentParser parser = XContentFactory.xContent(source) - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source.streamInput())) { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(source) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { return objectParser.apply(parser, null); } catch (IOException e) { errorHandler.accept(new ElasticsearchParseException("failed to parse " + hit.getType(), e)); @@ -487,8 +489,9 @@ public class JobProvider { Consumer errorHandler) { BytesReference source = getResponse.getSourceAsBytesRef(); - try (XContentParser parser = XContentFactory.xContent(XContentType.JSON) - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source.streamInput())) { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { return objectParser.apply(parser, null); } catch (IOException e) { errorHandler.accept(new ElasticsearchParseException("failed to parse " + getResponse.getType(), e)); @@ -523,8 +526,9 @@ public class JobProvider { List results = new ArrayList<>(); for (SearchHit hit : hits.getHits()) { BytesReference source = hit.getSourceRef(); - try (XContentParser parser = XContentFactory.xContent(source) - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source.streamInput())) { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(source) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { Bucket bucket = Bucket.PARSER.apply(parser, null); results.add(bucket); } catch (IOException e) { @@ -654,8 +658,9 @@ public class JobProvider { List results = new ArrayList<>(hits.length); for (SearchHit hit : hits) { BytesReference source = hit.getSourceRef(); - try (XContentParser parser = XContentFactory.xContent(source) - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source.streamInput())) { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(source) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { CategoryDefinition categoryDefinition = CategoryDefinition.PARSER.apply(parser, null); results.add(categoryDefinition); } catch (IOException e) { @@ -688,8 +693,9 @@ public class JobProvider { List results = new ArrayList<>(); for (SearchHit hit : searchResponse.getHits().getHits()) { BytesReference source = hit.getSourceRef(); - try (XContentParser parser = XContentFactory.xContent(source) - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source.streamInput())) { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(source) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { results.add(AnomalyRecord.PARSER.apply(parser, null)); } catch (IOException e) { throw new ElasticsearchParseException("failed to parse records", e); @@ -736,8 +742,9 @@ public class JobProvider { List influencers = new ArrayList<>(); for (SearchHit hit : response.getHits().getHits()) { BytesReference source = hit.getSourceRef(); - try (XContentParser parser = XContentFactory.xContent(source) - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source.streamInput())) { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(source) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { influencers.add(Influencer.PARSER.apply(parser, null)); } catch (IOException e) { throw new ElasticsearchParseException("failed to parse influencer", e); @@ -880,8 +887,9 @@ public class JobProvider { for (SearchHit hit : searchResponse.getHits().getHits()) { BytesReference source = hit.getSourceRef(); - try (XContentParser parser = XContentFactory.xContent(source) - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source.streamInput())) { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(source) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { ModelPlot modelPlot = ModelPlot.PARSER.apply(parser, null); results.add(modelPlot); } catch (IOException e) { @@ -1214,10 +1222,11 @@ public class JobProvider { if (getDocResponse.isExists()) { BytesReference docSource = getDocResponse.getSourceAsBytesRef(); - try (XContentParser parser = + try (InputStream stream = docSource.streamInput(); + XContentParser parser = XContentFactory.xContent(docSource) .createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, docSource.streamInput())) { + LoggingDeprecationHandler.INSTANCE, stream)) { Calendar calendar = Calendar.PARSER.apply(parser, null).build(); listener.onResponse(calendar); } diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java index f17171d71e9..44560bf117d 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java @@ -220,9 +220,9 @@ public class CppLogMessageHandler implements Closeable { } private void parseMessage(XContent xContent, BytesReference bytesRef) { - try { - XContentParser parser = xContent - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, bytesRef.streamInput()); + try (InputStream stream = bytesRef.streamInput(); + XContentParser parser = xContent + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { CppLogMessage msg = CppLogMessage.PARSER.apply(parser, null); Level level = Level.getLevel(msg.getLevel()); if (level == null) { diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java index 226c5b52f61..dcadef7a24b 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java @@ -80,10 +80,12 @@ public class NormalizerResultHandler extends AbstractComponent { } private void parseResult(XContent xContent, BytesReference bytesRef) throws IOException { - XContentParser parser = xContent - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, bytesRef.streamInput()); - NormalizerResult result = NormalizerResult.PARSER.apply(parser, null); - normalizedResults.add(result); + try (InputStream stream = bytesRef.streamInput(); + XContentParser parser = xContent + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + NormalizerResult result = NormalizerResult.PARSER.apply(parser, null); + normalizedResults.add(result); + } } private static int findNextMarker(byte marker, BytesReference bytesRef, int from) { diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index 45938e4ea87..ed3368f7c33 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -36,6 +36,7 @@ import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -117,11 +118,13 @@ public class ExpiredForecastsRemover implements MlDataRemover { } for (SearchHit hit : hits.getHits()) { - XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( - NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, hit.getSourceRef().streamInput()); - ForecastRequestStats forecastRequestStats = ForecastRequestStats.PARSER.apply(parser, null); - if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) { - forecastsToDelete.add(forecastRequestStats); + try (InputStream stream = hit.getSourceRef().streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( + NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + ForecastRequestStats forecastRequestStats = ForecastRequestStats.PARSER.apply(parser, null); + if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) { + forecastsToDelete.add(forecastRequestStats); + } } } return forecastsToDelete; diff --git a/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/FilteredMonitoringDoc.java b/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/FilteredMonitoringDoc.java index 97204b2ec22..152e57fb531 100644 --- a/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/FilteredMonitoringDoc.java +++ b/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/FilteredMonitoringDoc.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; import java.io.IOException; +import java.io.InputStream; import java.util.Set; import static org.elasticsearch.common.xcontent.NamedXContentRegistry.EMPTY; @@ -60,7 +61,8 @@ public abstract class FilteredMonitoringDoc extends MonitoringDoc { try (XContentBuilder filteredBuilder = new XContentBuilder(xContent, out, filters)) { super.toXContent(filteredBuilder, params); } - try (XContentParser parser = xContent.createParser(EMPTY, LoggingDeprecationHandler.INSTANCE, out.bytes().streamInput())) { + try (InputStream stream = out.bytes().streamInput(); + XContentParser parser = xContent.createParser(EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { return builder.copyCurrentStructure(parser); } } diff --git a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupRequestTranslator.java b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupRequestTranslator.java index d00f05fdcc0..16b2127fc45 100644 --- a/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupRequestTranslator.java +++ b/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupRequestTranslator.java @@ -9,6 +9,7 @@ package org.elasticsearch.xpack.rollup; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; @@ -588,8 +589,9 @@ public class RollupRequestTranslator { try { output.writeString(metric.getType()); metric.writeTo(output); - try (NamedWriteableAwareStreamInput in = - new NamedWriteableAwareStreamInput(output.bytes().streamInput(), registry)) { + try (StreamInput stream = output.bytes().streamInput(); + NamedWriteableAwareStreamInput in = + new NamedWriteableAwareStreamInput(stream, registry)) { ValuesSourceAggregationBuilder serialized = ((ValuesSourceAggregationBuilder)in.readNamedWriteable(AggregationBuilder.class)) diff --git a/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java b/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java index 8277b68b37c..0fcaf297c0f 100644 --- a/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java +++ b/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java @@ -40,6 +40,7 @@ import org.elasticsearch.xpack.security.authc.support.CachingUsernamePasswordRea import org.elasticsearch.xpack.security.authc.support.UserRoleMapper; import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -149,7 +150,9 @@ public class NativeRoleMappingStore extends AbstractComponent implements UserRol } private ExpressionRoleMapping buildMapping(String id, BytesReference source) { - try (XContentParser parser = getParser(source)) { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { return ExpressionRoleMapping.parse(id, parser); } catch (Exception e) { logger.warn(new ParameterizedMessage("Role mapping [{}] cannot be parsed and will be skipped", id), e); @@ -157,11 +160,6 @@ public class NativeRoleMappingStore extends AbstractComponent implements UserRol } } - private static XContentParser getParser(BytesReference source) throws IOException { - return XContentType.JSON.xContent() - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source.streamInput()); - } - /** * Stores (create or update) a single mapping in the index */ diff --git a/plugin/sql/sql-shared-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java b/plugin/sql/sql-shared-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java index bc0aa053a8b..eaac650c61c 100644 --- a/plugin/sql/sql-shared-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java +++ b/plugin/sql/sql-shared-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java @@ -157,8 +157,9 @@ public class HttpClient { private Response fromXContent(XContentType xContentType, BytesReference bytesReference, CheckedFunction responseParser) { - try (XContentParser parser = xContentType.xContent().createParser(registry, - LoggingDeprecationHandler.INSTANCE, bytesReference.streamInput())) { + try (InputStream stream = bytesReference.streamInput(); + XContentParser parser = xContentType.xContent().createParser(registry, + LoggingDeprecationHandler.INSTANCE, stream)) { return responseParser.apply(parser); } catch (IOException ex) { throw new ClientException("Cannot parse response", ex); diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/input/http/ExecutableHttpInput.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/input/http/ExecutableHttpInput.java index a6ec309a224..3831a519acb 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/input/http/ExecutableHttpInput.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/input/http/ExecutableHttpInput.java @@ -23,6 +23,7 @@ import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine; import org.elasticsearch.xpack.watcher.support.Variables; import org.elasticsearch.xpack.watcher.support.XContentFilterKeysUtils; +import java.io.InputStream; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -80,8 +81,9 @@ public class ExecutableHttpInput extends ExecutableInput errors = new ArrayList<>(); // EMPTY is safe here because we never call namedObject - try (XContentParser parser = JsonXContent.jsonXContent - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.body().streamInput())) { + try (InputStream stream = response.body().streamInput(); + XContentParser parser = JsonXContent.jsonXContent + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { XContentParser.Token token = parser.currentToken(); if (token == null) { token = parser.nextToken(); diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/notification/pagerduty/SentEvent.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/notification/pagerduty/SentEvent.java index d39ad05ab48..abc150c8b40 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/notification/pagerduty/SentEvent.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/notification/pagerduty/SentEvent.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.watcher.common.http.HttpResponse; import org.elasticsearch.xpack.watcher.actions.pagerduty.PagerDutyAction; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -102,10 +103,10 @@ public class SentEvent implements ToXContentObject { // lets first try to parse the error response in the body // based on https://developer.pagerduty.com/documentation/rest/errors - try { - // EMPTY is safe here because we never call namedObject - XContentParser parser = JsonXContent.jsonXContent - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.body().streamInput()); + try (InputStream stream = response.body().streamInput(); + XContentParser parser = JsonXContent.jsonXContent + // EMPTY is safe here because we never call namedObject + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { parser.nextToken(); String message = null; diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/search/WatcherSearchTemplateService.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/search/WatcherSearchTemplateService.java index 2769226c1d7..fa645ab72ef 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/search/WatcherSearchTemplateService.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/search/WatcherSearchTemplateService.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.support.Variables; import java.io.IOException; +import java.io.InputStream; import java.util.Map; /** @@ -64,8 +65,9 @@ public class WatcherSearchTemplateService extends AbstractComponent { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); BytesReference source = request.getSearchSource(); if (source != null && source.length() > 0) { - try (XContentParser parser = XContentFactory.xContent(source) - .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, source.streamInput())) { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(source) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) { sourceBuilder.parseXContent(parser); searchRequest.source(sourceBuilder); } diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchParser.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchParser.java index 813d3bf4a88..52491310582 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchParser.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchParser.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.joda.time.DateTime; import java.io.IOException; +import java.io.InputStream; import java.time.Clock; import java.util.Collections; import java.util.HashMap; @@ -108,9 +109,10 @@ public class WatchParser extends AbstractComponent { logger.trace("parsing watch [{}] ", source.utf8ToString()); } // EMPTY is safe here because we never use namedObject - try (WatcherXContentParser parser = new WatcherXContentParser(xContentType.xContent().createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, source.streamInput()), - now, withSecrets ? cryptoService : null, allowRedactedPasswords)) { + try (InputStream stream = source.streamInput(); + WatcherXContentParser parser = new WatcherXContentParser(xContentType.xContent().createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, stream), + now, withSecrets ? cryptoService : null, allowRedactedPasswords)) { parser.nextToken(); return parse(id, includeStatus, parser); } catch (IOException ioe) {