From fe9904fbea7d1de636bed13a57ee40489a80ed34 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 27 Apr 2020 11:48:53 +0200 Subject: [PATCH] More Efficient Blobstore Metdata IO (#55777) (#55788) No need to copy all these bytes multiple times, especially not when writing a multiple MB global cluster state snapshot through this method. --- .../common/compress/Compressor.java | 3 +- .../common/compress/DeflateCompressor.java | 4 +- .../blobstore/ChecksumBlobStoreFormat.java | 41 ++++++++++--------- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/compress/Compressor.java b/server/src/main/java/org/elasticsearch/common/compress/Compressor.java index 81fa2f2f6db..d926a0b7dab 100644 --- a/server/src/main/java/org/elasticsearch/common/compress/Compressor.java +++ b/server/src/main/java/org/elasticsearch/common/compress/Compressor.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; +import java.io.OutputStream; public interface Compressor { @@ -37,5 +38,5 @@ public interface Compressor { * Creates a new stream output that compresses the contents and writes to the provided stream * output. Closing the returned {@link StreamOutput} will close the provided stream output. */ - StreamOutput streamOutput(StreamOutput out) throws IOException; + StreamOutput streamOutput(OutputStream out) throws IOException; } diff --git a/server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java b/server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java index 9f5e1e8d25e..1ba9d43ecda 100644 --- a/server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java +++ b/server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java @@ -107,8 +107,8 @@ public class DeflateCompressor implements Compressor { } @Override - public StreamOutput streamOutput(StreamOutput out) throws IOException { - out.writeBytes(HEADER); + public StreamOutput streamOutput(OutputStream out) throws IOException { + out.write(HEADER); final boolean nowrap = true; final Deflater deflater = new Deflater(LEVEL, nowrap); final boolean syncFlush = true; diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index c2eb352d031..f0e704aa241 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -22,12 +22,15 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersIndexInput; +import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.OutputStreamIndexOutput; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.Streams; @@ -46,10 +49,10 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.gateway.CorruptStateException; import org.elasticsearch.snapshots.SnapshotInfo; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Arrays; import java.util.HashMap; import java.util.Locale; import java.util.Map; @@ -127,8 +130,10 @@ public final class ChecksumBlobStoreFormat { public T readBlob(BlobContainer blobContainer, String blobName) throws IOException { final BytesReference bytes = Streams.readFully(blobContainer.readBlob(blobName)); final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; - try (ByteArrayIndexInput indexInput = - new ByteArrayIndexInput(resourceDesc, BytesReference.toBytes(bytes))) { + try { + final IndexInput indexInput = bytes.length() > 0 ? new ByteBuffersIndexInput( + new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc) + : new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES); CodecUtil.checksumEntireFile(indexInput); CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION); long filePointer = indexInput.getFilePointer(); @@ -182,19 +187,9 @@ public final class ChecksumBlobStoreFormat { }); } - private void writeTo(final T obj, final String blobName, final CheckedConsumer consumer) throws IOException { - final BytesReference bytes; - try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { - if (compress) { - try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(bytesStreamOutput)) { - write(obj, compressedStreamOutput); - } - } else { - write(obj, bytesStreamOutput); - } - bytes = bytesStreamOutput.bytes(); - } - try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + private void writeTo(final T obj, final String blobName, + final CheckedConsumer consumer) throws IOException { + try (BytesStreamOutput outputStream = new BytesStreamOutput()) { final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")"; try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, outputStream, BUFFER_SIZE)) { CodecUtil.writeHeader(indexOutput, codec, VERSION); @@ -205,15 +200,21 @@ public final class ChecksumBlobStoreFormat { // in order to write the footer we need to prevent closing the actual index input. } }) { - bytes.writeTo(indexOutputOutputStream); + if (compress) { + try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(indexOutputOutputStream)) { + write(obj, compressedStreamOutput); + } + } else { + write(obj, indexOutputOutputStream); + } } CodecUtil.writeFooter(indexOutput); } - consumer.accept(new BytesArray(outputStream.toByteArray())); + consumer.accept(outputStream.bytes()); } } - private void write(T obj, StreamOutput streamOutput) throws IOException { + private void write(T obj, OutputStream streamOutput) throws IOException { try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE, streamOutput)) { builder.startObject(); obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS);