Unify Stream Copy Buffer Usage (#56078) (#60608)

We have various ways of copying between two streams and handling thread-local
buffers throughout the codebase. This commit unifies a number of them and
removes buffer allocations in many spots.
This commit is contained in:
Armin Braun 2020-08-04 09:54:52 +02:00 committed by GitHub
parent 54aaadade7
commit 7ae9dc2092
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 53 additions and 113 deletions

View File

@ -22,7 +22,6 @@ package org.elasticsearch.core.internal.io;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
/**
* Simple utility methods for file and stream copying.
@ -34,19 +33,23 @@ import java.util.Objects;
*/
public class Streams {
private static final ThreadLocal<byte[]> buffer = ThreadLocal.withInitial(() -> new byte[8 * 1024]);
private Streams() {
}
/**
* Copy the contents of the given InputStream to the given OutputStream.
* Closes both streams when done.
* Copy the contents of the given InputStream to the given OutputStream. Optionally, closes both streams when done.
*
* @param in the stream to copy from
* @param out the stream to copy to
* @param in the stream to copy from
* @param out the stream to copy to
* @param close whether to close both streams after copying
* @param buffer buffer to use for copying
* @return the number of bytes copied
* @throws IOException in case of I/O errors
*/
public static long copy(final InputStream in, final OutputStream out) throws IOException {
Objects.requireNonNull(in, "No InputStream specified");
Objects.requireNonNull(out, "No OutputStream specified");
final byte[] buffer = new byte[8192];
public static long copy(final InputStream in, final OutputStream out, byte[] buffer, boolean close) throws IOException {
Exception err = null;
try {
long byteCount = 0;
@ -61,7 +64,30 @@ public class Streams {
err = e;
throw e;
} finally {
IOUtils.close(err, in, out);
if (close) {
IOUtils.close(err, in, out);
}
}
}
/**
* @see #copy(InputStream, OutputStream, byte[], boolean)
*/
public static long copy(final InputStream in, final OutputStream out, boolean close) throws IOException {
return copy(in, out, buffer.get(), close);
}
/**
* @see #copy(InputStream, OutputStream, byte[], boolean)
*/
public static long copy(final InputStream in, final OutputStream out, byte[] buffer) throws IOException {
return copy(in, out, buffer, true);
}
/**
* @see #copy(InputStream, OutputStream, byte[], boolean)
*/
public static long copy(final InputStream in, final OutputStream out) throws IOException {
return copy(in, out, buffer.get(), true);
}
}

View File

@ -36,7 +36,7 @@ import org.elasticsearch.common.xcontent.XContentGenerator;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.filtering.FilterPathBasedFilter;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.io.Streams;
import java.io.BufferedInputStream;
import java.io.IOException;
@ -349,7 +349,7 @@ public class JsonXContentGenerator implements XContentGenerator {
} else {
writeStartRaw(name);
flush();
copyStream(content, os);
Streams.copy(content, os);
writeEndRaw();
}
}
@ -364,24 +364,11 @@ public class JsonXContentGenerator implements XContentGenerator {
generator.writeRaw(':');
}
flush();
transfer(stream, os);
Streams.copy(stream, os, false);
writeEndRaw();
}
}
// A basic copy of Java 9's InputStream#transferTo
private static long transfer(InputStream in, OutputStream out) throws IOException {
Objects.requireNonNull(out, "out");
long transferred = 0;
byte[] buffer = new byte[8192];
int read;
while ((read = in.read(buffer, 0, 8192)) >= 0) {
out.write(buffer, 0, read);
transferred += read;
}
return transferred;
}
private boolean mayWriteRawData(XContentType contentType) {
// When the current generator is filtered (ie filter != null)
// or the content is in a different format than the current generator,
@ -480,37 +467,4 @@ public class JsonXContentGenerator implements XContentGenerator {
public boolean isClosed() {
return generator.isClosed();
}
/**
* Copy the contents of the given InputStream to the given OutputStream.
* Closes both streams when done.
*
* @param in the stream to copy from
* @param out the stream to copy to
* @return the number of bytes copied
* @throws IOException in case of I/O errors
*/
private static long copyStream(InputStream in, OutputStream out) throws IOException {
Objects.requireNonNull(in, "No InputStream specified");
Objects.requireNonNull(out, "No OutputStream specified");
final byte[] buffer = new byte[8192];
boolean success = false;
try {
long byteCount = 0;
int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) {
out.write(buffer, 0, bytesRead);
byteCount += bytesRead;
}
out.flush();
success = true;
return byteCount;
} finally {
if (success) {
IOUtils.close(in, out);
} else {
IOUtils.closeWhileHandlingException(in, out);
}
}
}
}

View File

@ -292,7 +292,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
* It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy
* is in the stacktrace and is not granted the permissions needed to close and write the channel.
*/
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
org.elasticsearch.core.internal.io.Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@SuppressForbidden(reason = "channel is based on a socket")
@Override
@ -350,7 +350,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
throws IOException {
assert blobSize <= getLargeBlobThresholdInBytes() : "large blob uploads should use the resumable upload method";
final byte[] buffer = new byte[Math.toIntExact(blobSize)];
org.elasticsearch.common.io.Streams.readFully(inputStream, buffer);
Streams.readFully(inputStream, buffer);
try {
final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ?
new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } :

View File

@ -165,7 +165,7 @@ public class FsBlobContainer extends AbstractBlobContainer {
channel.position(position);
}
assert channel.position() == position;
return org.elasticsearch.common.io.Streams.limitStream(Channels.newInputStream(channel), length);
return Streams.limitStream(Channels.newInputStream(channel), length);
}
@Override
@ -212,7 +212,8 @@ public class FsBlobContainer extends AbstractBlobContainer {
private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException {
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
final int bufferSize = blobStore.bufferSizeInBytes();
Streams.copy(inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]);
org.elasticsearch.core.internal.io.Streams.copy(
inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]);
}
IOUtils.fsync(tempBlobPath, false);
}

View File

@ -65,45 +65,6 @@ public abstract class Streams {
}
};
//---------------------------------------------------------------------
// Copy methods for java.io.InputStream / java.io.OutputStream
//---------------------------------------------------------------------
public static long copy(InputStream in, OutputStream out) throws IOException {
return copy(in, out, new byte[BUFFER_SIZE]);
}
/**
* Copy the contents of the given InputStream to the given OutputStream.
* Closes both streams when done.
*
* @param in the stream to copy from
* @param out the stream to copy to
* @return the number of bytes copied
* @throws IOException in case of I/O errors
*/
public static long copy(InputStream in, OutputStream out, byte[] buffer) throws IOException {
Objects.requireNonNull(in, "No InputStream specified");
Objects.requireNonNull(out, "No OutputStream specified");
// Leverage try-with-resources to close in and out so that exceptions in close() are either propagated or added as suppressed
// exceptions to the main exception
try (InputStream in2 = in; OutputStream out2 = out) {
return doCopy(in2, out2, buffer);
}
}
private static long doCopy(InputStream in, OutputStream out, byte[] buffer) throws IOException {
long byteCount = 0;
int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) {
out.write(buffer, 0, bytesRead);
byteCount += bytesRead;
}
out.flush();
return byteCount;
}
/**
* Copy the contents of the given byte array to the given OutputStream.
* Closes the stream when done.
@ -222,7 +183,7 @@ public abstract class Streams {
* Fully consumes the input stream, throwing the bytes away. Returns the number of bytes consumed.
*/
public static long consumeFully(InputStream inputStream) throws IOException {
return copy(inputStream, NULL_OUTPUT_STREAM);
return org.elasticsearch.core.internal.io.Streams.copy(inputStream, NULL_OUTPUT_STREAM);
}
public static List<String> readAllLines(InputStream input) throws IOException {
@ -267,11 +228,9 @@ public abstract class Streams {
* Reads all bytes from the given {@link InputStream} and closes it afterwards.
*/
public static BytesReference readFully(InputStream in) throws IOException {
try (InputStream inputStream = in) {
BytesStreamOutput out = new BytesStreamOutput();
copy(inputStream, out);
return out.bytes();
}
BytesStreamOutput out = new BytesStreamOutput();
org.elasticsearch.core.internal.io.Streams.copy(in, out);
return out.bytes();
}
/**

View File

@ -91,7 +91,7 @@ public class StreamsTests extends ESTestCase {
final int limit = randomIntBetween(0, bytes.length);
final BytesArray stuffArray = new BytesArray(bytes);
final ByteArrayOutputStream out = new ByteArrayOutputStream(bytes.length);
final long count = Streams.copy(Streams.limitStream(stuffArray.streamInput(), limit), out);
final long count = org.elasticsearch.core.internal.io.Streams.copy(Streams.limitStream(stuffArray.streamInput(), limit), out);
assertEquals(limit, count);
assertThat(Arrays.equals(out.toByteArray(), Arrays.copyOf(bytes, limit)), equalTo(true));
}

View File

@ -28,13 +28,13 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;

View File

@ -29,13 +29,13 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

View File

@ -10,12 +10,12 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

View File

@ -53,7 +53,6 @@ import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
@ -70,6 +69,7 @@ import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.BoolQueryBuilder;