Dry up inputstream to bytesreference (#43675) (#44094)

* Dry up Reading InputStream to BytesReference
* Dry up spots where we use the same pattern to get from an InputStream to a BytesReferences
This commit is contained in:
Armin Braun 2019-07-09 09:18:25 +02:00 committed by GitHub
parent f1ebb82031
commit 9eac5ceb1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 52 additions and 90 deletions

View File

@ -21,11 +21,9 @@ package org.elasticsearch.common.compress;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
@ -93,10 +91,6 @@ public class CompressorFactory {
} }
private static BytesReference uncompress(BytesReference bytes, Compressor compressor) throws IOException { private static BytesReference uncompress(BytesReference bytes, Compressor compressor) throws IOException {
StreamInput compressed = compressor.streamInput(bytes.streamInput()); return Streams.readFully(compressor.streamInput(bytes.streamInput()));
BytesStreamOutput bStream = new BytesStreamOutput();
Streams.copy(compressed, bStream);
compressed.close();
return bStream.bytes();
} }
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.common.io;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStream; import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.BufferedReader; import java.io.BufferedReader;
@ -226,6 +227,17 @@ public abstract class Streams {
return new FlushOnCloseOutputStream(os); return new FlushOnCloseOutputStream(os);
} }
/**
* Reads all bytes from the given {@link InputStream} and closes it afterwards.
*/
public static BytesReference readFully(InputStream in) throws IOException {
try (InputStream inputStream = in) {
BytesStreamOutput out = new BytesStreamOutput();
copy(inputStream, out);
return out.bytes();
}
}
/** /**
* A wrapper around a {@link BytesStream} that makes the close operation a flush. This is * A wrapper around a {@link BytesStream} that makes the close operation a flush. This is
* needed as sometimes a stream will be closed but the bytes that the stream holds still need * needed as sometimes a stream will be closed but the bytes that the stream holds still need

View File

@ -50,6 +50,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
@ -60,10 +61,8 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
@ -671,27 +670,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final String snapshotsIndexBlobName = INDEX_FILE_PREFIX + Long.toString(indexGen); final String snapshotsIndexBlobName = INDEX_FILE_PREFIX + Long.toString(indexGen);
RepositoryData repositoryData; RepositoryData repositoryData;
try (InputStream blob = blobContainer().readBlob(snapshotsIndexBlobName)) {
BytesStreamOutput out = new BytesStreamOutput();
Streams.copy(blob, out);
// EMPTY is safe here because RepositoryData#fromXContent calls namedObject // EMPTY is safe here because RepositoryData#fromXContent calls namedObject
try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, try (InputStream blob = blobContainer().readBlob(snapshotsIndexBlobName);
LoggingDeprecationHandler.INSTANCE, out.bytes(), XContentType.JSON)) { XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, blob)) {
repositoryData = RepositoryData.snapshotsFromXContent(parser, indexGen); repositoryData = RepositoryData.snapshotsFromXContent(parser, indexGen);
} catch (NotXContentException e) {
logger.warn("[{}] index blob is not valid x-content [{} bytes]", snapshotsIndexBlobName, out.bytes().length());
throw e;
}
} }
// now load the incompatible snapshot ids, if they exist // now load the incompatible snapshot ids, if they exist
try (InputStream blob = blobContainer().readBlob(INCOMPATIBLE_SNAPSHOTS_BLOB)) { try (InputStream blob = blobContainer().readBlob(INCOMPATIBLE_SNAPSHOTS_BLOB);
BytesStreamOutput out = new BytesStreamOutput(); XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
Streams.copy(blob, out); LoggingDeprecationHandler.INSTANCE, blob)) {
try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, out.bytes(), XContentType.JSON)) {
repositoryData = repositoryData.incompatibleSnapshotsFromXContent(parser); repositoryData = repositoryData.incompatibleSnapshotsFromXContent(parser);
}
} catch (NoSuchFileException e) { } catch (NoSuchFileException e) {
if (isReadOnly()) { if (isReadOnly()) {
logger.debug("[{}] Incompatible snapshots blob [{}] does not exist, the likely " + logger.debug("[{}] Incompatible snapshots blob [{}] does not exist, the likely " +
@ -804,11 +794,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// package private for testing // package private for testing
long readSnapshotIndexLatestBlob() throws IOException { long readSnapshotIndexLatestBlob() throws IOException {
try (InputStream blob = blobContainer().readBlob(INDEX_LATEST_BLOB)) { return Numbers.bytesToLong(Streams.readFully(blobContainer().readBlob(INDEX_LATEST_BLOB)).toBytesRef());
BytesStreamOutput out = new BytesStreamOutput();
Streams.copy(blob, out);
return Numbers.bytesToLong(out.bytes().toBytesRef());
}
} }
private long listBlobsToGetLatestIndexId() throws IOException { private long listBlobsToGetLatestIndexId() throws IOException {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
@ -42,7 +43,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.gateway.CorruptStateException; import org.elasticsearch.gateway.CorruptStateException;
import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotInfo;
@ -149,18 +149,16 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
* @param blobName blob name * @param blobName blob name
*/ */
public T readBlob(BlobContainer blobContainer, String blobName) throws IOException { public T readBlob(BlobContainer blobContainer, String blobName) throws IOException {
try (InputStream inputStream = blobContainer.readBlob(blobName)) { final BytesReference bytes = Streams.readFully(blobContainer.readBlob(blobName));
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(inputStream, out);
final byte[] bytes = out.toByteArray();
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
try (ByteArrayIndexInput indexInput = new ByteArrayIndexInput(resourceDesc, bytes)) { try (ByteArrayIndexInput indexInput =
new ByteArrayIndexInput(resourceDesc, BytesReference.toBytes(bytes))) {
CodecUtil.checksumEntireFile(indexInput); CodecUtil.checksumEntireFile(indexInput);
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION); CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
long filePointer = indexInput.getFilePointer(); long filePointer = indexInput.getFilePointer();
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer; long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE,
new BytesArray(bytes, (int) filePointer, (int) contentSize))) { bytes.slice((int) filePointer, (int) contentSize))) {
return reader.apply(parser); return reader.apply(parser);
} }
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
@ -168,7 +166,6 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
throw new CorruptStateException(ex); throw new CorruptStateException(ex);
} }
} }
}
/** /**
* Writes blob in atomic manner with resolving the blob name using {@link #blobName} method. * Writes blob in atomic manner with resolving the blob name using {@link #blobName} method.

View File

@ -22,14 +22,11 @@ import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryData;
@ -81,15 +78,11 @@ public final class BlobStoreTestUtil {
} }
assertIndexGenerations(blobContainer, latestGen); assertIndexGenerations(blobContainer, latestGen);
final RepositoryData repositoryData; final RepositoryData repositoryData;
try (InputStream inputStream = blobContainer.readBlob("index-" + latestGen); try (InputStream blob = blobContainer.readBlob("index-" + latestGen);
BytesStreamOutput out = new BytesStreamOutput()) { XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
Streams.copy(inputStream, out); LoggingDeprecationHandler.INSTANCE, blob)) {
try (XContentParser parser =
XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
out.bytes(), XContentType.JSON)) {
repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen); repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen);
} }
}
assertIndexUUIDs(blobContainer, repositoryData); assertIndexUUIDs(blobContainer, repositoryData);
assertSnapshotUUIDs(blobContainer, repositoryData); assertSnapshotUUIDs(blobContainer, repositoryData);
listener.onResponse(null); listener.onResponse(null);

View File

@ -16,17 +16,15 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.Map; import java.util.Map;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -73,12 +71,7 @@ public class TemplateUtils {
* Loads a resource from the classpath and returns it as a {@link BytesReference} * Loads a resource from the classpath and returns it as a {@link BytesReference}
*/ */
public static BytesReference load(String name) throws IOException { public static BytesReference load(String name) throws IOException {
try (InputStream is = TemplateUtils.class.getResourceAsStream(name)) { return Streams.readFully(TemplateUtils.class.getResourceAsStream(name));
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
Streams.copy(is, out);
return new BytesArray(out.toByteArray());
}
}
} }
/** /**

View File

@ -11,7 +11,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -19,7 +19,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
@ -38,7 +37,6 @@ import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorF
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Date; import java.util.Date;
@ -407,9 +405,7 @@ class DatafeedJob {
throws IOException { throws IOException {
PostDataAction.Request request = new PostDataAction.Request(jobId); PostDataAction.Request request = new PostDataAction.Request(jobId);
request.setDataDescription(dataDescription); request.setDataDescription(dataDescription);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); request.setContent(Streams.readFully(inputStream), xContentType);
Streams.copy(inputStream, outputStream);
request.setContent(new BytesArray(outputStream.toByteArray()), xContentType);
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
PostDataAction.Response response = client.execute(PostDataAction.INSTANCE, request).actionGet(); PostDataAction.Response response = client.execute(PostDataAction.INSTANCE, request).actionGet();
return response.getDataCounts(); return response.getDataCounts();

View File

@ -7,14 +7,11 @@ package org.elasticsearch.xpack.monitoring.exporter;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.core.internal.io.Streams;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -124,13 +121,7 @@ public class ClusterAlertsUtil {
} }
private static BytesReference loadResource(final String resource) throws IOException { private static BytesReference loadResource(final String resource) throws IOException {
try (InputStream is = ClusterAlertsUtil.class.getResourceAsStream(resource)) { return Streams.readFully(ClusterAlertsUtil.class.getResourceAsStream(resource));
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
Streams.copy(is, out);
return new BytesArray(out.toByteArray());
}
}
} }
/** /**