From 9eac5ceb1b458c7f79712d9deaf67e04246537d4 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 9 Jul 2019 09:18:25 +0200 Subject: [PATCH] Dry up inputstream to bytesreference (#43675) (#44094) * Dry up Reading InputStream to BytesReference * Dry up spots where we use the same pattern to get from an InputStream to a BytesReferences --- .../common/compress/CompressorFactory.java | 10 +---- .../org/elasticsearch/common/io/Streams.java | 12 ++++++ .../blobstore/BlobStoreRepository.java | 40 ++++++------------- .../blobstore/ChecksumBlobStoreFormat.java | 33 +++++++-------- .../blobstore/BlobStoreTestUtil.java | 15 ++----- .../xpack/core/template/TemplateUtils.java | 11 +---- .../xpack/ml/datafeed/DatafeedJob.java | 8 +--- .../exporter/ClusterAlertsUtil.java | 13 +----- 8 files changed, 52 insertions(+), 90 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java b/server/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java index 3b1202fe66f..2ff2f4e95df 100644 --- a/server/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java +++ b/server/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java @@ -21,11 +21,9 @@ package org.elasticsearch.common.compress; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.internal.io.Streams; import java.io.IOException; import java.util.Objects; @@ -93,10 +91,6 @@ public class CompressorFactory { } private static BytesReference uncompress(BytesReference bytes, Compressor compressor) throws IOException { - StreamInput compressed = compressor.streamInput(bytes.streamInput()); - BytesStreamOutput bStream = new BytesStreamOutput(); - Streams.copy(compressed, bStream); - compressed.close(); - return bStream.bytes(); + return Streams.readFully(compressor.streamInput(bytes.streamInput())); } } diff --git a/server/src/main/java/org/elasticsearch/common/io/Streams.java b/server/src/main/java/org/elasticsearch/common/io/Streams.java index 46a6956914f..4a8f2f5de5b 100644 --- a/server/src/main/java/org/elasticsearch/common/io/Streams.java +++ b/server/src/main/java/org/elasticsearch/common/io/Streams.java @@ -21,6 +21,7 @@ package org.elasticsearch.common.io; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStream; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.BufferedReader; @@ -226,6 +227,17 @@ public abstract class Streams { return new FlushOnCloseOutputStream(os); } + /** + * Reads all bytes from the given {@link InputStream} and closes it afterwards. + */ + public static BytesReference readFully(InputStream in) throws IOException { + try (InputStream inputStream = in) { + BytesStreamOutput out = new BytesStreamOutput(); + copy(inputStream, out); + return out.bytes(); + } + } + /** * A wrapper around a {@link BytesStream} that makes the close operation a flush. This is * needed as sometimes a stream will be closed but the bytes that the stream holds still need diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 85b1721c978..54dec86af4b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -50,6 +50,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.NotXContentException; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; @@ -60,10 +61,8 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; @@ -671,32 +670,23 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final String snapshotsIndexBlobName = INDEX_FILE_PREFIX + Long.toString(indexGen); RepositoryData repositoryData; - try (InputStream blob = blobContainer().readBlob(snapshotsIndexBlobName)) { - BytesStreamOutput out = new BytesStreamOutput(); - Streams.copy(blob, out); - // EMPTY is safe here because RepositoryData#fromXContent calls namedObject - try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, out.bytes(), XContentType.JSON)) { - repositoryData = RepositoryData.snapshotsFromXContent(parser, indexGen); - } catch (NotXContentException e) { - logger.warn("[{}] index blob is not valid x-content [{} bytes]", snapshotsIndexBlobName, out.bytes().length()); - throw e; - } + // EMPTY is safe here because RepositoryData#fromXContent calls namedObject + try (InputStream blob = blobContainer().readBlob(snapshotsIndexBlobName); + XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, blob)) { + repositoryData = RepositoryData.snapshotsFromXContent(parser, indexGen); } // now load the incompatible snapshot ids, if they exist - try (InputStream blob = blobContainer().readBlob(INCOMPATIBLE_SNAPSHOTS_BLOB)) { - BytesStreamOutput out = new BytesStreamOutput(); - Streams.copy(blob, out); - try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, out.bytes(), XContentType.JSON)) { - repositoryData = repositoryData.incompatibleSnapshotsFromXContent(parser); - } + try (InputStream blob = blobContainer().readBlob(INCOMPATIBLE_SNAPSHOTS_BLOB); + XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, blob)) { + repositoryData = repositoryData.incompatibleSnapshotsFromXContent(parser); } catch (NoSuchFileException e) { if (isReadOnly()) { logger.debug("[{}] Incompatible snapshots blob [{}] does not exist, the likely " + - "reason is that there are no incompatible snapshots in the repository", - metadata.name(), INCOMPATIBLE_SNAPSHOTS_BLOB); + "reason is that there are no incompatible snapshots in the repository", + metadata.name(), INCOMPATIBLE_SNAPSHOTS_BLOB); } else { // write an empty incompatible-snapshots blob - we do this so that there // is a blob present, which helps speed up some cloud-based repositories @@ -804,11 +794,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp // package private for testing long readSnapshotIndexLatestBlob() throws IOException { - try (InputStream blob = blobContainer().readBlob(INDEX_LATEST_BLOB)) { - BytesStreamOutput out = new BytesStreamOutput(); - Streams.copy(blob, out); - return Numbers.bytesToLong(out.bytes().toBytesRef()); - } + return Numbers.bytesToLong(Streams.readFully(blobContainer().readBlob(INDEX_LATEST_BLOB)).toBytesRef()); } private long listBlobsToGetLatestIndexId() throws IOException { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 16751399a18..dfca516dbdc 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; @@ -42,7 +43,6 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.gateway.CorruptStateException; import org.elasticsearch.snapshots.SnapshotInfo; @@ -149,24 +149,21 @@ public final class ChecksumBlobStoreFormat { * @param blobName blob name */ public T readBlob(BlobContainer blobContainer, String blobName) throws IOException { - try (InputStream inputStream = blobContainer.readBlob(blobName)) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Streams.copy(inputStream, out); - final byte[] bytes = out.toByteArray(); - final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; - try (ByteArrayIndexInput indexInput = new ByteArrayIndexInput(resourceDesc, bytes)) { - CodecUtil.checksumEntireFile(indexInput); - CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION); - long filePointer = indexInput.getFilePointer(); - long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer; - try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, - new BytesArray(bytes, (int) filePointer, (int) contentSize))) { - return reader.apply(parser); - } - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { - // we trick this into a dedicated exception with the original stacktrace - throw new CorruptStateException(ex); + final BytesReference bytes = Streams.readFully(blobContainer.readBlob(blobName)); + final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; + try (ByteArrayIndexInput indexInput = + new ByteArrayIndexInput(resourceDesc, BytesReference.toBytes(bytes))) { + CodecUtil.checksumEntireFile(indexInput); + CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION); + long filePointer = indexInput.getFilePointer(); + long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer; + try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, + bytes.slice((int) filePointer, (int) contentSize))) { + return reader.apply(parser); } + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + // we trick this into a dedicated exception with the original stacktrace + throw new CorruptStateException(ex); } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index d75345bf718..f6b4159b514 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -22,14 +22,11 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryData; @@ -81,14 +78,10 @@ public final class BlobStoreTestUtil { } assertIndexGenerations(blobContainer, latestGen); final RepositoryData repositoryData; - try (InputStream inputStream = blobContainer.readBlob("index-" + latestGen); - BytesStreamOutput out = new BytesStreamOutput()) { - Streams.copy(inputStream, out); - try (XContentParser parser = - XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, - out.bytes(), XContentType.JSON)) { - repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen); - } + try (InputStream blob = blobContainer.readBlob("index-" + latestGen); + XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, blob)) { + repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen); } assertIndexUUIDs(blobContainer, repositoryData); assertSnapshotUUIDs(blobContainer, repositoryData); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/TemplateUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/TemplateUtils.java index 893c91f056c..6b25f7855f1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/TemplateUtils.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/TemplateUtils.java @@ -16,17 +16,15 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.compress.NotXContentException; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.internal.io.Streams; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; import java.util.Map; import java.util.function.Predicate; import java.util.regex.Pattern; @@ -73,12 +71,7 @@ public class TemplateUtils { * Loads a resource from the classpath and returns it as a {@link BytesReference} */ public static BytesReference load(String name) throws IOException { - try (InputStream is = TemplateUtils.class.getResourceAsStream(name)) { - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - Streams.copy(is, out); - return new BytesArray(out.toByteArray()); - } - } + return Streams.readFully(TemplateUtils.class.getResourceAsStream(name)); } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index db6e0947147..4a9e4fd41d9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -11,7 +11,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ToXContent; @@ -19,7 +19,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.ml.action.FlushJobAction; @@ -38,7 +37,6 @@ import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorF import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.notifications.Auditor; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.Date; @@ -407,9 +405,7 @@ class DatafeedJob { throws IOException { PostDataAction.Request request = new PostDataAction.Request(jobId); request.setDataDescription(dataDescription); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - Streams.copy(inputStream, outputStream); - request.setContent(new BytesArray(outputStream.toByteArray()), xContentType); + request.setContent(Streams.readFully(inputStream), xContentType); try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { PostDataAction.Response response = client.execute(PostDataAction.INSTANCE, request).actionGet(); return response.getDataCounts(); diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/ClusterAlertsUtil.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/ClusterAlertsUtil.java index 782ecba1c30..2fe7e983a7a 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/ClusterAlertsUtil.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/ClusterAlertsUtil.java @@ -7,14 +7,11 @@ package org.elasticsearch.xpack.monitoring.exporter; import org.elasticsearch.Version; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.SettingsException; -import org.elasticsearch.core.internal.io.Streams; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; import java.util.Arrays; import java.util.List; import java.util.Locale; @@ -124,13 +121,7 @@ public class ClusterAlertsUtil { } private static BytesReference loadResource(final String resource) throws IOException { - try (InputStream is = ClusterAlertsUtil.class.getResourceAsStream(resource)) { - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - Streams.copy(is, out); - - return new BytesArray(out.toByteArray()); - } - } + return Streams.readFully(ClusterAlertsUtil.class.getResourceAsStream(resource)); } /**