diff --git a/libs/core/src/main/java/org/elasticsearch/core/internal/io/Streams.java b/libs/core/src/main/java/org/elasticsearch/core/internal/io/Streams.java index a006028b905..eb6114ffdb9 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/internal/io/Streams.java +++ b/libs/core/src/main/java/org/elasticsearch/core/internal/io/Streams.java @@ -22,7 +22,6 @@ package org.elasticsearch.core.internal.io; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Objects; /** * Simple utility methods for file and stream copying. @@ -34,19 +33,23 @@ import java.util.Objects; */ public class Streams { + private static final ThreadLocal buffer = ThreadLocal.withInitial(() -> new byte[8 * 1024]); + + private Streams() { + + } + /** - * Copy the contents of the given InputStream to the given OutputStream. - * Closes both streams when done. + * Copy the contents of the given InputStream to the given OutputStream. Optionally, closes both streams when done. * - * @param in the stream to copy from - * @param out the stream to copy to + * @param in the stream to copy from + * @param out the stream to copy to + * @param close whether to close both streams after copying + * @param buffer buffer to use for copying * @return the number of bytes copied * @throws IOException in case of I/O errors */ - public static long copy(final InputStream in, final OutputStream out) throws IOException { - Objects.requireNonNull(in, "No InputStream specified"); - Objects.requireNonNull(out, "No OutputStream specified"); - final byte[] buffer = new byte[8192]; + public static long copy(final InputStream in, final OutputStream out, byte[] buffer, boolean close) throws IOException { Exception err = null; try { long byteCount = 0; @@ -61,7 +64,30 @@ public class Streams { err = e; throw e; } finally { - IOUtils.close(err, in, out); + if (close) { + IOUtils.close(err, in, out); + } } } + + /** + * @see #copy(InputStream, OutputStream, byte[], boolean) + */ + public static long copy(final InputStream in, final OutputStream out, boolean close) throws IOException { + return copy(in, out, buffer.get(), close); + } + + /** + * @see #copy(InputStream, OutputStream, byte[], boolean) + */ + public static long copy(final InputStream in, final OutputStream out, byte[] buffer) throws IOException { + return copy(in, out, buffer, true); + } + + /** + * @see #copy(InputStream, OutputStream, byte[], boolean) + */ + public static long copy(final InputStream in, final OutputStream out) throws IOException { + return copy(in, out, buffer.get(), true); + } } diff --git a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java index 97d25653ad6..65dc4136b9c 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java +++ b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java @@ -36,7 +36,7 @@ import org.elasticsearch.common.xcontent.XContentGenerator; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.filtering.FilterPathBasedFilter; -import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.core.internal.io.Streams; import java.io.BufferedInputStream; import java.io.IOException; @@ -349,7 +349,7 @@ public class JsonXContentGenerator implements XContentGenerator { } else { writeStartRaw(name); flush(); - copyStream(content, os); + Streams.copy(content, os); writeEndRaw(); } } @@ -364,24 +364,11 @@ public class JsonXContentGenerator implements XContentGenerator { generator.writeRaw(':'); } flush(); - transfer(stream, os); + Streams.copy(stream, os, false); writeEndRaw(); } } - // A basic copy of Java 9's InputStream#transferTo - private static long transfer(InputStream in, OutputStream out) throws IOException { - Objects.requireNonNull(out, "out"); - long transferred = 0; - byte[] buffer = new byte[8192]; - int read; - while ((read = in.read(buffer, 0, 8192)) >= 0) { - out.write(buffer, 0, read); - transferred += read; - } - return transferred; - } - private boolean mayWriteRawData(XContentType contentType) { // When the current generator is filtered (ie filter != null) // or the content is in a different format than the current generator, @@ -480,37 +467,4 @@ public class JsonXContentGenerator implements XContentGenerator { public boolean isClosed() { return generator.isClosed(); } - - /** - * Copy the contents of the given InputStream to the given OutputStream. - * Closes both streams when done. - * - * @param in the stream to copy from - * @param out the stream to copy to - * @return the number of bytes copied - * @throws IOException in case of I/O errors - */ - private static long copyStream(InputStream in, OutputStream out) throws IOException { - Objects.requireNonNull(in, "No InputStream specified"); - Objects.requireNonNull(out, "No OutputStream specified"); - final byte[] buffer = new byte[8192]; - boolean success = false; - try { - long byteCount = 0; - int bytesRead; - while ((bytesRead = in.read(buffer)) != -1) { - out.write(buffer, 0, bytesRead); - byteCount += bytesRead; - } - out.flush(); - success = true; - return byteCount; - } finally { - if (success) { - IOUtils.close(in, out); - } else { - IOUtils.closeWhileHandlingException(in, out); - } - } - } } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 145c578bc8f..bd21fe67d46 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -292,7 +292,7 @@ class GoogleCloudStorageBlobStore implements BlobStore { * It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy * is in the stacktrace and is not granted the permissions needed to close and write the channel. */ - Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { + org.elasticsearch.core.internal.io.Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { @SuppressForbidden(reason = "channel is based on a socket") @Override @@ -350,7 +350,7 @@ class GoogleCloudStorageBlobStore implements BlobStore { throws IOException { assert blobSize <= getLargeBlobThresholdInBytes() : "large blob uploads should use the resumable upload method"; final byte[] buffer = new byte[Math.toIntExact(blobSize)]; - org.elasticsearch.common.io.Streams.readFully(inputStream, buffer); + Streams.readFully(inputStream, buffer); try { final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ? new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } : diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 4d0e99ef928..83b11a180da 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -165,7 +165,7 @@ public class FsBlobContainer extends AbstractBlobContainer { channel.position(position); } assert channel.position() == position; - return org.elasticsearch.common.io.Streams.limitStream(Channels.newInputStream(channel), length); + return Streams.limitStream(Channels.newInputStream(channel), length); } @Override @@ -212,7 +212,8 @@ public class FsBlobContainer extends AbstractBlobContainer { private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException { try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) { final int bufferSize = blobStore.bufferSizeInBytes(); - Streams.copy(inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]); + org.elasticsearch.core.internal.io.Streams.copy( + inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]); } IOUtils.fsync(tempBlobPath, false); } diff --git a/server/src/main/java/org/elasticsearch/common/io/Streams.java b/server/src/main/java/org/elasticsearch/common/io/Streams.java index 9033d3e5f48..1422cb1e9c2 100644 --- a/server/src/main/java/org/elasticsearch/common/io/Streams.java +++ b/server/src/main/java/org/elasticsearch/common/io/Streams.java @@ -65,45 +65,6 @@ public abstract class Streams { } }; - //--------------------------------------------------------------------- - // Copy methods for java.io.InputStream / java.io.OutputStream - //--------------------------------------------------------------------- - - - public static long copy(InputStream in, OutputStream out) throws IOException { - return copy(in, out, new byte[BUFFER_SIZE]); - } - - /** - * Copy the contents of the given InputStream to the given OutputStream. - * Closes both streams when done. - * - * @param in the stream to copy from - * @param out the stream to copy to - * @return the number of bytes copied - * @throws IOException in case of I/O errors - */ - public static long copy(InputStream in, OutputStream out, byte[] buffer) throws IOException { - Objects.requireNonNull(in, "No InputStream specified"); - Objects.requireNonNull(out, "No OutputStream specified"); - // Leverage try-with-resources to close in and out so that exceptions in close() are either propagated or added as suppressed - // exceptions to the main exception - try (InputStream in2 = in; OutputStream out2 = out) { - return doCopy(in2, out2, buffer); - } - } - - private static long doCopy(InputStream in, OutputStream out, byte[] buffer) throws IOException { - long byteCount = 0; - int bytesRead; - while ((bytesRead = in.read(buffer)) != -1) { - out.write(buffer, 0, bytesRead); - byteCount += bytesRead; - } - out.flush(); - return byteCount; - } - /** * Copy the contents of the given byte array to the given OutputStream. * Closes the stream when done. @@ -222,7 +183,7 @@ public abstract class Streams { * Fully consumes the input stream, throwing the bytes away. Returns the number of bytes consumed. */ public static long consumeFully(InputStream inputStream) throws IOException { - return copy(inputStream, NULL_OUTPUT_STREAM); + return org.elasticsearch.core.internal.io.Streams.copy(inputStream, NULL_OUTPUT_STREAM); } public static List readAllLines(InputStream input) throws IOException { @@ -267,11 +228,9 @@ public abstract class Streams { * Reads all bytes from the given {@link InputStream} and closes it afterwards. */ public static BytesReference readFully(InputStream in) throws IOException { - try (InputStream inputStream = in) { - BytesStreamOutput out = new BytesStreamOutput(); - copy(inputStream, out); - return out.bytes(); - } + BytesStreamOutput out = new BytesStreamOutput(); + org.elasticsearch.core.internal.io.Streams.copy(in, out); + return out.bytes(); } /** diff --git a/server/src/test/java/org/elasticsearch/common/io/StreamsTests.java b/server/src/test/java/org/elasticsearch/common/io/StreamsTests.java index 30c8a9c6e49..d06a1ea1123 100644 --- a/server/src/test/java/org/elasticsearch/common/io/StreamsTests.java +++ b/server/src/test/java/org/elasticsearch/common/io/StreamsTests.java @@ -91,7 +91,7 @@ public class StreamsTests extends ESTestCase { final int limit = randomIntBetween(0, bytes.length); final BytesArray stuffArray = new BytesArray(bytes); final ByteArrayOutputStream out = new ByteArrayOutputStream(bytes.length); - final long count = Streams.copy(Streams.limitStream(stuffArray.streamInput(), limit), out); + final long count = org.elasticsearch.core.internal.io.Streams.copy(Streams.limitStream(stuffArray.streamInput(), limit), out); assertEquals(limit, count); assertThat(Arrays.equals(out.toByteArray(), Arrays.copyOf(bytes, limit)), equalTo(true)); } diff --git a/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java b/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java index e44340ab2b5..c108179ebed 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java @@ -28,13 +28,13 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.test.ESTestCase; import java.io.IOException; diff --git a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java index e0533724519..9c8758916c7 100644 --- a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java @@ -29,13 +29,13 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyUtils.java index 2e66b618a2e..c849423776b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyUtils.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyUtils.java @@ -10,12 +10,12 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.NotXContentException; -import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.Streams; import java.io.ByteArrayOutputStream; import java.io.IOException; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java index eb2e25ed849..a3764601a5e 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java @@ -53,7 +53,6 @@ import org.elasticsearch.common.cache.Cache; import org.elasticsearch.common.cache.CacheBuilder; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.hash.MessageDigests; -import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; @@ -70,6 +69,7 @@ import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.query.BoolQueryBuilder;