Remove Redundant Stream Wrapping from Compression (#62017) (#62132)

In many cases we don't need a `StreamInput` or `StreamOutput`
wrapper around these streams so I this commit adjusts the API
to just normal streams and adds the wrapping where necessary.
This commit is contained in:
Armin Braun 2020-09-09 03:27:38 +02:00 committed by GitHub
parent 2ca5f98e05
commit ed4984a32e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 80 additions and 84 deletions

View File

@ -34,8 +34,10 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.internal.io.IOUtils;
@ -160,7 +162,7 @@ public class PublicationTransportHandler {
StreamInput in = request.bytes().streamInput();
try {
if (compressor != null) {
in = compressor.threadLocalStreamInput(in);
in = new InputStreamStreamInput(compressor.threadLocalInputStream(in));
}
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
in.setVersion(request.version());
@ -240,7 +242,7 @@ public class PublicationTransportHandler {
private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(bStream)) {
try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) {
stream.setVersion(nodeVersion);
stream.writeBoolean(true);
clusterState.writeTo(stream);
@ -253,7 +255,7 @@ public class PublicationTransportHandler {
private static BytesReference serializeDiffClusterState(Diff<ClusterState> diff, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(bStream)) {
try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) {
stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);

View File

@ -71,7 +71,7 @@ public final class CompressedXContent {
*/
public CompressedXContent(ToXContent xcontent, XContentType type, ToXContent.Params params) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput();
OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(bStream);
OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream);
CRC32 crc32 = new CRC32();
OutputStream checkedStream = new CheckedOutputStream(compressedStream, crc32);
try (XContentBuilder builder = XContentFactory.contentBuilder(type, checkedStream)) {

View File

@ -20,10 +20,9 @@
package org.elasticsearch.common.compress;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public interface Compressor {
@ -33,20 +32,20 @@ public interface Compressor {
int headerLength();
/**
* Creates a new stream input that decompresses the contents read from the provided stream input.
* Closing the returned {@link StreamInput} will close the provided stream input.
* Creates a new input stream that decompresses the contents read from the provided input stream.
* Closing the returned {@link InputStream} will close the provided stream input.
* Note: The returned stream may only be used on the thread that created it as it might use thread-local resources and must be safely
* closed after use
*/
StreamInput threadLocalStreamInput(StreamInput in) throws IOException;
InputStream threadLocalInputStream(InputStream in) throws IOException;
/**
* Creates a new stream output that compresses the contents and writes to the provided stream
* output. Closing the returned {@link StreamOutput} will close the provided stream output.
* Creates a new output stream that compresses the contents and writes to the provided output stream.
* Closing the returned {@link OutputStream} will close the provided output stream.
* Note: The returned stream may only be used on the thread that created it as it might use thread-local resources and must be safely
* closed after use
*/
StreamOutput threadLocalStreamOutput(OutputStream out) throws IOException;
OutputStream threadLocalOutputStream(OutputStream out) throws IOException;
/**
* Decompress bytes into a newly allocated buffer.

View File

@ -22,10 +22,6 @@ package org.elasticsearch.common.compress;
import org.elasticsearch.Assertions;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import java.io.BufferedInputStream;
@ -128,8 +124,8 @@ public class DeflateCompressor implements Compressor {
}
@Override
public StreamInput threadLocalStreamInput(StreamInput in) throws IOException {
return new InputStreamStreamInput(inputStream(in, true));
public InputStream threadLocalInputStream(InputStream in) throws IOException {
return inputStream(in, true);
}
/**
@ -187,7 +183,7 @@ public class DeflateCompressor implements Compressor {
}
@Override
public StreamOutput threadLocalStreamOutput(OutputStream out) throws IOException {
public OutputStream threadLocalOutputStream(OutputStream out) throws IOException {
out.write(HEADER);
final ReleasableReference<Deflater> current = deflaterForStreamRef.get();
final Releasable releasable;
@ -202,8 +198,7 @@ public class DeflateCompressor implements Compressor {
}
final boolean syncFlush = true;
DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE);
return new OutputStreamStreamOutput(compressedOut) {
return new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE) {
// Due to https://bugs.openjdk.java.net/browse/JDK-8054565 we can't rely on the buffered output stream to only close once
// in code that has to support Java 8 so we manually manage a close flag for this stream.

View File

@ -51,7 +51,7 @@ public class XContentHelper {
BytesReference bytes) throws IOException {
Compressor compressor = CompressorFactory.compressor(bytes);
if (compressor != null) {
InputStream compressedInput = compressor.threadLocalStreamInput(bytes.streamInput());
InputStream compressedInput = compressor.threadLocalInputStream(bytes.streamInput());
if (compressedInput.markSupported() == false) {
compressedInput = new BufferedInputStream(compressedInput);
}
@ -70,7 +70,7 @@ public class XContentHelper {
Objects.requireNonNull(xContentType);
Compressor compressor = CompressorFactory.compressor(bytes);
if (compressor != null) {
InputStream compressedInput = compressor.threadLocalStreamInput(bytes.streamInput());
InputStream compressedInput = compressor.threadLocalInputStream(bytes.streamInput());
if (compressedInput.markSupported() == false) {
compressedInput = new BufferedInputStream(compressedInput);
}
@ -106,7 +106,7 @@ public class XContentHelper {
InputStream input;
Compressor compressor = CompressorFactory.compressor(bytes);
if (compressor != null) {
InputStream compressedStreamInput = compressor.threadLocalStreamInput(bytes.streamInput());
InputStream compressedStreamInput = compressor.threadLocalInputStream(bytes.streamInput());
if (compressedStreamInput.markSupported() == false) {
compressedStreamInput = new BufferedInputStream(compressedStreamInput);
}
@ -355,7 +355,7 @@ public class XContentHelper {
ToXContent.Params params) throws IOException {
Compressor compressor = CompressorFactory.compressor(source);
if (compressor != null) {
try (InputStream compressedStreamInput = compressor.threadLocalStreamInput(source.streamInput())) {
try (InputStream compressedStreamInput = compressor.threadLocalInputStream(source.streamInput())) {
builder.rawField(field, compressedStreamInput);
}
} else {
@ -374,7 +374,7 @@ public class XContentHelper {
Objects.requireNonNull(xContentType);
Compressor compressor = CompressorFactory.compressor(source);
if (compressor != null) {
try (InputStream compressedStreamInput = compressor.threadLocalStreamInput(source.streamInput())) {
try (InputStream compressedStreamInput = compressor.threadLocalInputStream(source.streamInput())) {
builder.rawField(field, compressedStreamInput, xContentType);
}
} else {

View File

@ -35,10 +35,13 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
@ -354,7 +357,7 @@ public class PublishClusterStateAction {
public static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(bStream)) {
try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) {
stream.setVersion(nodeVersion);
stream.writeBoolean(true);
clusterState.writeTo(stream);
@ -364,7 +367,7 @@ public class PublishClusterStateAction {
public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(bStream)) {
try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) {
stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);
@ -382,7 +385,7 @@ public class PublishClusterStateAction {
synchronized (lastSeenClusterStateMutex) {
try {
if (compressor != null) {
in = compressor.threadLocalStreamInput(in);
in = new InputStreamStreamInput(compressor.threadLocalInputStream(in));
}
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
in.setVersion(request.version());

View File

@ -67,7 +67,6 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
@ -1347,7 +1346,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
private RepositoryData repositoryDataFromCachedEntry(Tuple<Long, BytesReference> cacheEntry) throws IOException {
try (StreamInput input = CompressorFactory.COMPRESSOR.threadLocalStreamInput(cacheEntry.v2().streamInput())) {
try (InputStream input = CompressorFactory.COMPRESSOR.threadLocalInputStream(cacheEntry.v2().streamInput())) {
return RepositoryData.snapshotsFromXContent(
XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, input), cacheEntry.v1(), false);

View File

@ -158,7 +158,7 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
// in order to write the footer we need to prevent closing the actual index input.
}
}; XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE,
compress ? CompressorFactory.COMPRESSOR.threadLocalStreamOutput(indexOutputOutputStream)
compress ? CompressorFactory.COMPRESSOR.threadLocalOutputStream(indexOutputOutputStream)
: indexOutputOutputStream)) {
builder.startObject();
obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS);

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.DeflaterOutputStream;
/**
@ -44,7 +45,7 @@ import java.util.zip.DeflaterOutputStream;
*/
final class CompressibleBytesOutputStream extends StreamOutput {
private final StreamOutput stream;
private final OutputStream stream;
private final BytesStream bytesStreamOutput;
private final boolean shouldCompress;
@ -52,7 +53,7 @@ final class CompressibleBytesOutputStream extends StreamOutput {
this.bytesStreamOutput = bytesStreamOutput;
this.shouldCompress = shouldCompress;
if (shouldCompress) {
this.stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(bytesStreamOutput));
this.stream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput));
} else {
this.stream = bytesStreamOutput;
}
@ -82,7 +83,7 @@ final class CompressibleBytesOutputStream extends StreamOutput {
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
stream.writeBytes(b, offset, length);
stream.write(b, offset, length);
}
@Override

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
@ -168,7 +169,7 @@ public final class TransportLogger {
private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException {
if (TransportStatus.isCompress(status) && streamInput.available() > 0) {
try {
return CompressorFactory.COMPRESSOR.threadLocalStreamInput(streamInput);
return new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(streamInput));
} catch (IllegalArgumentException e) {
throw new IllegalStateException("stream marked as compressed, but is missing deflate header");
}

View File

@ -21,15 +21,13 @@ package org.elasticsearch.common.compress;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
@ -384,33 +382,26 @@ public class DeflateCompressTests extends ESTestCase {
}
private void doTest(byte bytes[]) throws IOException {
ByteBuffer bb = ByteBuffer.wrap(bytes);
StreamInput rawIn = new ByteBufferStreamInput(bb);
InputStream rawIn = new ByteArrayInputStream(bytes);
Compressor c = compressor;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStreamStreamOutput rawOs = new OutputStreamStreamOutput(bos);
StreamOutput os = c.threadLocalStreamOutput(rawOs);
Random r = random();
final Random r = random();
int bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000);
int prepadding = r.nextInt(70000);
int postpadding = r.nextInt(70000);
byte buffer[] = new byte[prepadding + bufferSize + postpadding];
r.nextBytes(buffer); // fill block completely with junk
byte[] buffer = new byte[prepadding + bufferSize + postpadding];
int len;
try (OutputStream os = c.threadLocalOutputStream(bos)) {
r.nextBytes(buffer); // fill block completely with junk
while ((len = rawIn.read(buffer, prepadding, bufferSize)) != -1) {
os.write(buffer, prepadding, len);
}
os.close();
}
rawIn.close();
// now we have compressed byte array
byte compressed[] = bos.toByteArray();
ByteBuffer bb2 = ByteBuffer.wrap(compressed);
StreamInput compressedIn = new ByteBufferStreamInput(bb2);
StreamInput in = c.threadLocalStreamInput(compressedIn);
InputStream in = c.threadLocalInputStream(new ByteArrayInputStream(bos.toByteArray()));
// randomize constants again
bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000);

View File

@ -22,11 +22,11 @@ package org.elasticsearch.common.compress;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.test.ESTestCase;
import org.junit.Assert;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Random;
import static org.hamcrest.Matchers.equalTo;
@ -69,18 +69,18 @@ public class DeflateCompressedXContentTests extends ESTestCase {
public void testDifferentCompressedRepresentation() throws Exception {
byte[] b = "---\nf:abcdefghijabcdefghij".getBytes("UTF-8");
BytesStreamOutput bout = new BytesStreamOutput();
StreamOutput out = compressor.threadLocalStreamOutput(bout);
out.writeBytes(b);
try (OutputStream out = compressor.threadLocalOutputStream(bout)) {
out.write(b);
out.flush();
out.writeBytes(b);
out.close();
out.write(b);
}
final BytesReference b1 = bout.bytes();
bout = new BytesStreamOutput();
out = compressor.threadLocalStreamOutput(bout);
out.writeBytes(b);
out.writeBytes(b);
out.close();
try (OutputStream out = compressor.threadLocalOutputStream(bout)) {
out.write(b);
out.write(b);
}
final BytesReference b2 = bout.bytes();
// because of the intermediate flush, the two compressed representations

View File

@ -24,10 +24,10 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import static org.hamcrest.Matchers.equalTo;
@ -61,7 +61,7 @@ public class BinaryFieldMapperTests extends MapperTestCase {
// case 2: a value that looks compressed: this used to fail in 1.x
BytesStreamOutput out = new BytesStreamOutput();
try (StreamOutput compressed = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(out)) {
try (OutputStream compressed = CompressorFactory.COMPRESSOR.threadLocalOutputStream(out)) {
new BytesArray(binaryValue1).writeTo(compressed);
}
final byte[] binaryValue2 = BytesReference.toBytes(out.bytes());

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
@ -71,7 +72,7 @@ public class CompressibleBytesOutputStreamTests extends ESTestCase {
assertTrue(CompressorFactory.COMPRESSOR.isCompressed(bytesRef));
StreamInput streamInput = CompressorFactory.COMPRESSOR.threadLocalStreamInput(bytesRef.streamInput());
StreamInput streamInput = new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(bytesRef.streamInput()));
byte[] actualBytes = new byte[expectedBytes.length];
streamInput.readBytes(actualBytes, 0, expectedBytes.length);
@ -94,7 +95,8 @@ public class CompressibleBytesOutputStreamTests extends ESTestCase {
stream.write(expectedBytes);
StreamInput streamInput = CompressorFactory.COMPRESSOR.threadLocalStreamInput(bStream.bytes().streamInput());
StreamInput streamInput =
new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(bStream.bytes().streamInput()));
byte[] actualBytes = new byte[expectedBytes.length];
EOFException e = expectThrows(EOFException.class, () -> streamInput.readBytes(actualBytes, 0, expectedBytes.length));
assertEquals("Unexpected end of ZLIB input stream", e.getMessage());

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasables;
@ -32,15 +33,16 @@ import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.io.OutputStream;
public class TransportDecompressorTests extends ESTestCase {
public void testSimpleCompression() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
StreamOutput deflateStream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(output));
byte randomByte = randomByte();
try (OutputStream deflateStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output))) {
deflateStream.write(randomByte);
deflateStream.close();
}
BytesReference bytes = output.bytes();
@ -57,11 +59,12 @@ public class TransportDecompressorTests extends ESTestCase {
public void testMultiPageCompression() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
StreamOutput deflateStream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(output));
try (StreamOutput deflateStream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(
Streams.flushOnCloseStream(output)))) {
for (int i = 0; i < 10000; ++i) {
deflateStream.writeInt(i);
}
deflateStream.close();
}
BytesReference bytes = output.bytes();
@ -85,11 +88,12 @@ public class TransportDecompressorTests extends ESTestCase {
public void testIncrementalMultiPageCompression() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
StreamOutput deflateStream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(output));
try (StreamOutput deflateStream = new OutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(output)))) {
for (int i = 0; i < 10000; ++i) {
deflateStream.writeInt(i);
}
deflateStream.close();
}
BytesReference bytes = output.bytes();

View File

@ -18,7 +18,6 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.DeflateCompressor;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
@ -84,7 +83,7 @@ class HttpExportBulk extends ExportBulk {
if (docs != null && docs.isEmpty() == false) {
final BytesStreamOutput scratch = new BytesStreamOutput();
final CountingOutputStream countingStream;
try (StreamOutput payload = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(scratch)) {
try (OutputStream payload = CompressorFactory.COMPRESSOR.threadLocalOutputStream(scratch)) {
countingStream = new CountingOutputStream(payload);
for (MonitoringDoc monitoringDoc : docs) {
writeDocument(monitoringDoc, countingStream);