More Efficient Blobstore Metdata IO (#55777) (#55788)

No need to copy all these bytes multiple times, especially not when
writing a multiple MB global cluster state snapshot through this method.
This commit is contained in:
Armin Braun 2020-04-27 11:48:53 +02:00 committed by GitHub
parent d56f25acb4
commit fe9904fbea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 23 deletions

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
public interface Compressor { public interface Compressor {
@ -37,5 +38,5 @@ public interface Compressor {
* Creates a new stream output that compresses the contents and writes to the provided stream * Creates a new stream output that compresses the contents and writes to the provided stream
* output. Closing the returned {@link StreamOutput} will close the provided stream output. * output. Closing the returned {@link StreamOutput} will close the provided stream output.
*/ */
StreamOutput streamOutput(StreamOutput out) throws IOException; StreamOutput streamOutput(OutputStream out) throws IOException;
} }

View File

@ -107,8 +107,8 @@ public class DeflateCompressor implements Compressor {
} }
@Override @Override
public StreamOutput streamOutput(StreamOutput out) throws IOException { public StreamOutput streamOutput(OutputStream out) throws IOException {
out.writeBytes(HEADER); out.write(HEADER);
final boolean nowrap = true; final boolean nowrap = true;
final Deflater deflater = new Deflater(LEVEL, nowrap); final Deflater deflater = new Deflater(LEVEL, nowrap);
final boolean syncFlush = true; final boolean syncFlush = true;

View File

@ -22,12 +22,15 @@ import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.store.OutputStreamIndexOutput;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
@ -46,10 +49,10 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.CorruptStateException; import org.elasticsearch.gateway.CorruptStateException;
import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotInfo;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -127,8 +130,10 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
public T readBlob(BlobContainer blobContainer, String blobName) throws IOException { public T readBlob(BlobContainer blobContainer, String blobName) throws IOException {
final BytesReference bytes = Streams.readFully(blobContainer.readBlob(blobName)); final BytesReference bytes = Streams.readFully(blobContainer.readBlob(blobName));
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
try (ByteArrayIndexInput indexInput = try {
new ByteArrayIndexInput(resourceDesc, BytesReference.toBytes(bytes))) { final IndexInput indexInput = bytes.length() > 0 ? new ByteBuffersIndexInput(
new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc)
: new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES);
CodecUtil.checksumEntireFile(indexInput); CodecUtil.checksumEntireFile(indexInput);
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION); CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
long filePointer = indexInput.getFilePointer(); long filePointer = indexInput.getFilePointer();
@ -182,19 +187,9 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
}); });
} }
private void writeTo(final T obj, final String blobName, final CheckedConsumer<BytesArray, IOException> consumer) throws IOException { private void writeTo(final T obj, final String blobName,
final BytesReference bytes; final CheckedConsumer<BytesReference, IOException> consumer) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
if (compress) {
try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(bytesStreamOutput)) {
write(obj, compressedStreamOutput);
}
} else {
write(obj, bytesStreamOutput);
}
bytes = bytesStreamOutput.bytes();
}
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")"; final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")";
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, outputStream, BUFFER_SIZE)) { try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, outputStream, BUFFER_SIZE)) {
CodecUtil.writeHeader(indexOutput, codec, VERSION); CodecUtil.writeHeader(indexOutput, codec, VERSION);
@ -205,15 +200,21 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
// in order to write the footer we need to prevent closing the actual index input. // in order to write the footer we need to prevent closing the actual index input.
} }
}) { }) {
bytes.writeTo(indexOutputOutputStream); if (compress) {
try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(indexOutputOutputStream)) {
write(obj, compressedStreamOutput);
}
} else {
write(obj, indexOutputOutputStream);
}
} }
CodecUtil.writeFooter(indexOutput); CodecUtil.writeFooter(indexOutput);
} }
consumer.accept(new BytesArray(outputStream.toByteArray())); consumer.accept(outputStream.bytes());
} }
} }
private void write(T obj, StreamOutput streamOutput) throws IOException { private void write(T obj, OutputStream streamOutput) throws IOException {
try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE, streamOutput)) { try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE, streamOutput)) {
builder.startObject(); builder.startObject();
obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS); obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS);