Closing a ReleasableBytesStreamOutput closes the underlying BigArray (#23941)

This commit makes closing a ReleasableBytesStreamOutput release the underlying BigArray so
that we can use try-with-resources with these streams and avoid leaking memory by not returning
the BigArray. As part of this change, the ReleasableBytesStreamOutput adds protection to only
release the BigArray once.

In order to make some of the changes cleaner, the ReleasableBytesStream interface has been
removed. The BytesStream interface is changed to a abstract class so that we can use it as a
useable return type for a new method, Streams#flushOnCloseStream. This new method wraps a
given stream and overrides the close method so that the stream is simply flushed and not closed.
This behavior is used in the TcpTransport when compression is used with a
ReleasableBytesStreamOutput as we need to close the compressed stream to ensure all of the data
is written from this stream. Closing the compressed stream will try to close the underlying stream
but we only want to flush so that all of the written bytes are available.

Additionally, an error message method added in the BytesRestResponse did not use a builder
provided by the channel and instead created its own JSON builder. This changes that method to use
the channel builder and in turn the bytes stream output that is managed by the channel.

Note, this commit differs from 6bfecdf921 in that it updates
ReleasableBytesStreamOutput to handle the case of the BigArray decreasing in size, which changes
the reference to the BigArray. When the reference is changed, the releasable needs to be updated
otherwise there could be a leak of bytes and corruption of data in unrelated streams.

This reverts commit afd45c1432, which reverted #23572.
This commit is contained in:
Jay Modi 2017-04-14 10:50:31 -04:00 committed by GitHub
parent e3aa2a89f9
commit 30ab8739a6
17 changed files with 275 additions and 89 deletions

View File

@ -30,13 +30,17 @@ import org.elasticsearch.common.util.ByteArray;
*/ */
public final class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable { public final class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable {
public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) { private final Releasable releasable;
public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length,
Releasable releasable) {
super(bigarrays, byteArray, length); super(bigarrays, byteArray, length);
this.releasable = releasable;
} }
@Override @Override
public void close() { public void close() {
Releasables.close(byteArray); Releasables.close(releasable);
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.common.compress; package org.elasticsearch.common.compress;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -31,5 +32,9 @@ public interface Compressor {
StreamInput streamInput(StreamInput in) throws IOException; StreamInput streamInput(StreamInput in) throws IOException;
/**
* Creates a new stream output that compresses the contents and writes to the provided stream
* output. Closing the returned {@link StreamOutput} will close the provided stream output.
*/
StreamOutput streamOutput(StreamOutput out) throws IOException; StreamOutput streamOutput(StreamOutput out) throws IOException;
} }

View File

@ -20,7 +20,6 @@
package org.elasticsearch.common.compress; package org.elasticsearch.common.compress;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -47,7 +46,7 @@ public class DeflateCompressor implements Compressor {
// It needs to be different from other compressors and to not be specific // It needs to be different from other compressors and to not be specific
// enough so that no stream starting with these bytes could be detected as // enough so that no stream starting with these bytes could be detected as
// a XContent // a XContent
private static final byte[] HEADER = new byte[] { 'D', 'F', 'L', '\0' }; private static final byte[] HEADER = new byte[]{'D', 'F', 'L', '\0'};
// 3 is a good trade-off between speed and compression ratio // 3 is a good trade-off between speed and compression ratio
private static final int LEVEL = 3; private static final int LEVEL = 3;
// We use buffering on the input and output of in/def-laters in order to // We use buffering on the input and output of in/def-laters in order to
@ -88,6 +87,7 @@ public class DeflateCompressor implements Compressor {
decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE); decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE);
return new InputStreamStreamInput(decompressedIn) { return new InputStreamStreamInput(decompressedIn) {
final AtomicBoolean closed = new AtomicBoolean(false); final AtomicBoolean closed = new AtomicBoolean(false);
public void close() throws IOException { public void close() throws IOException {
try { try {
super.close(); super.close();
@ -107,10 +107,11 @@ public class DeflateCompressor implements Compressor {
final boolean nowrap = true; final boolean nowrap = true;
final Deflater deflater = new Deflater(LEVEL, nowrap); final Deflater deflater = new Deflater(LEVEL, nowrap);
final boolean syncFlush = true; final boolean syncFlush = true;
OutputStream compressedOut = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush); DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
compressedOut = new BufferedOutputStream(compressedOut, BUFFER_SIZE); OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE);
return new OutputStreamStreamOutput(compressedOut) { return new OutputStreamStreamOutput(compressedOut) {
final AtomicBoolean closed = new AtomicBoolean(false); final AtomicBoolean closed = new AtomicBoolean(false);
public void close() throws IOException { public void close() throws IOException {
try { try {
super.close(); super.close();

View File

@ -1,32 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
/**
* A bytes stream that requires its bytes to be released once no longer used.
*/
public interface ReleasableBytesStream extends BytesStream {
@Override
ReleasablePagedBytesReference bytes();
}

View File

@ -20,6 +20,9 @@
package org.elasticsearch.common.io; package org.elasticsearch.common.io;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.Callback;
import java.io.BufferedReader; import java.io.BufferedReader;
@ -236,4 +239,56 @@ public abstract class Streams {
} }
} }
} }
/**
* Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when
* close is called.
*/
public static BytesStream flushOnCloseStream(BytesStream os) {
return new FlushOnCloseOutputStream(os);
}
/**
* A wrapper around a {@link BytesStream} that makes the close operation a flush. This is
* needed as sometimes a stream will be closed but the bytes that the stream holds still need
* to be used and the stream cannot be closed until the bytes have been consumed.
*/
private static class FlushOnCloseOutputStream extends BytesStream {
private final BytesStream delegate;
private FlushOnCloseOutputStream(BytesStream bytesStreamOutput) {
this.delegate = bytesStreamOutput;
}
@Override
public void writeByte(byte b) throws IOException {
delegate.writeByte(b);
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
delegate.writeBytes(b, offset, length);
}
@Override
public void flush() throws IOException {
delegate.flush();
}
@Override
public void close() throws IOException {
flush();
}
@Override
public void reset() throws IOException {
delegate.reset();
}
@Override
public BytesReference bytes() {
return delegate.bytes();
}
}
} }

View File

@ -17,11 +17,11 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.common.io; package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
public interface BytesStream { public abstract class BytesStream extends StreamOutput {
BytesReference bytes(); public abstract BytesReference bytes();
} }

View File

@ -21,7 +21,6 @@ package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.PagedBytesReference; import org.elasticsearch.common.bytes.PagedBytesReference;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.ByteArray;
@ -31,7 +30,7 @@ import java.io.IOException;
* A @link {@link StreamOutput} that uses {@link BigArrays} to acquire pages of * A @link {@link StreamOutput} that uses {@link BigArrays} to acquire pages of
* bytes, which avoids frequent reallocation & copying of the internal data. * bytes, which avoids frequent reallocation & copying of the internal data.
*/ */
public class BytesStreamOutput extends StreamOutput implements BytesStream { public class BytesStreamOutput extends BytesStream {
protected final BigArrays bigArrays; protected final BigArrays bigArrays;
@ -151,7 +150,7 @@ public class BytesStreamOutput extends StreamOutput implements BytesStream {
return bytes.ramBytesUsed(); return bytes.ramBytesUsed();
} }
private void ensureCapacity(long offset) { void ensureCapacity(long offset) {
if (offset > Integer.MAX_VALUE) { if (offset > Integer.MAX_VALUE) {
throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data"); throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data");
} }

View File

@ -20,29 +20,66 @@
package org.elasticsearch.common.io.stream; package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.io.ReleasableBytesStream; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
/** /**
* An bytes stream output that allows providing a {@link BigArrays} instance * An bytes stream output that allows providing a {@link BigArrays} instance
* expecting it to require releasing its content ({@link #bytes()}) once done. * expecting it to require releasing its content ({@link #bytes()}) once done.
* <p> * <p>
* Please note, its is the responsibility of the caller to make sure the bytes * Please note, closing this stream will release the bytes that are in use by any
* reference do not "escape" and are released only once. * {@link ReleasablePagedBytesReference} returned from {@link #bytes()}, so this
* stream should only be closed after the bytes have been output or copied
* elsewhere.
*/ */
public class ReleasableBytesStreamOutput extends BytesStreamOutput implements ReleasableBytesStream { public class ReleasableBytesStreamOutput extends BytesStreamOutput
implements Releasable {
private Releasable releasable;
public ReleasableBytesStreamOutput(BigArrays bigarrays) { public ReleasableBytesStreamOutput(BigArrays bigarrays) {
super(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays); this(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays);
} }
public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) { public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) {
super(expectedSize, bigArrays); super(expectedSize, bigArrays);
this.releasable = Releasables.releaseOnce(this.bytes);
}
/**
* Returns a {@link Releasable} implementation of a
* {@link org.elasticsearch.common.bytes.BytesReference} that represents the current state of
* the bytes in the stream.
*/
@Override
public ReleasablePagedBytesReference bytes() {
return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable);
} }
@Override @Override
public ReleasablePagedBytesReference bytes() { public void close() {
return new ReleasablePagedBytesReference(bigArrays, bytes, count); Releasables.close(releasable);
} }
@Override
void ensureCapacity(long offset) {
final ByteArray prevBytes = this.bytes;
super.ensureCapacity(offset);
if (prevBytes != this.bytes) {
// re-create the releasable with the new reference
releasable = Releasables.releaseOnce(this.bytes);
}
}
@Override
public void reset() {
final ByteArray prevBytes = this.bytes;
super.reset();
if (prevBytes != this.bytes) {
// re-create the releasable with the new reference
releasable = Releasables.releaseOnce(this.bytes);
}
}
} }

View File

@ -22,7 +22,7 @@ package org.elasticsearch.common.xcontent;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.BytesStream; import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
@ -53,7 +53,7 @@ import java.util.concurrent.TimeUnit;
/** /**
* A utility to build XContent (ie json). * A utility to build XContent (ie json).
*/ */
public final class XContentBuilder implements BytesStream, Releasable, Flushable { public final class XContentBuilder implements Releasable, Flushable {
/** /**
* Create a new {@link XContentBuilder} using the given {@link XContent} content. * Create a new {@link XContentBuilder} using the given {@link XContent} content.
@ -1041,7 +1041,6 @@ public final class XContentBuilder implements BytesStream, Releasable, Flushable
return this.generator; return this.generator;
} }
@Override
public BytesReference bytes() { public BytesReference bytes() {
close(); close();
return ((BytesStream) bos).bytes(); return ((BytesStream) bos).bytes();

View File

@ -439,7 +439,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally { } finally {
Releasables.close(out.bytes()); Releasables.close(out);
} }
} }
@ -1332,7 +1332,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
bytes.writeTo(outStream); bytes.writeTo(outStream);
} }
} finally { } finally {
Releasables.close(out.bytes()); Releasables.close(out);
} }
} }

View File

@ -20,12 +20,14 @@ package org.elasticsearch.rest;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections; import java.util.Collections;
import java.util.Set; import java.util.Set;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -97,7 +99,9 @@ public abstract class AbstractRestChannel implements RestChannel {
excludes = filters.stream().filter(EXCLUDE_FILTER).map(f -> f.substring(1)).collect(toSet()); excludes = filters.stream().filter(EXCLUDE_FILTER).map(f -> f.substring(1)).collect(toSet());
} }
XContentBuilder builder = new XContentBuilder(XContentFactory.xContent(responseContentType), bytesOutput(), includes, excludes); OutputStream unclosableOutputStream = Streams.flushOnCloseStream(bytesOutput());
XContentBuilder builder =
new XContentBuilder(XContentFactory.xContent(responseContentType), unclosableOutputStream, includes, excludes);
if (pretty) { if (pretty) {
builder.prettyPrint().lfAtEnd(); builder.prettyPrint().lfAtEnd();
} }
@ -107,8 +111,9 @@ public abstract class AbstractRestChannel implements RestChannel {
} }
/** /**
* A channel level bytes output that can be reused. It gets reset on each call to this * A channel level bytes output that can be reused. The bytes output is lazily instantiated
* method. * by a call to {@link #newBytesOutput()}. Once the stream is created, it gets reset on each
* call to this method.
*/ */
@Override @Override
public final BytesStreamOutput bytesOutput() { public final BytesStreamOutput bytesOutput() {
@ -120,6 +125,14 @@ public abstract class AbstractRestChannel implements RestChannel {
return bytesOut; return bytesOut;
} }
/**
* An accessor to the raw value of the channel bytes output. This method will not instantiate
* a new stream if one does not exist and this method will not reset the stream.
*/
protected final BytesStreamOutput bytesOutputOrNull() {
return bytesOut;
}
protected BytesStreamOutput newBytesOutput() { protected BytesStreamOutput newBytesOutput() {
return new BytesStreamOutput(); return new BytesStreamOutput();
} }

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException; import java.io.IOException;
@ -147,8 +146,8 @@ public class BytesRestResponse extends RestResponse {
return builder; return builder;
} }
static BytesRestResponse createSimpleErrorResponse(RestStatus status, String errorMessage) throws IOException { static BytesRestResponse createSimpleErrorResponse(RestChannel channel, RestStatus status, String errorMessage) throws IOException {
return new BytesRestResponse(status, JsonXContent.contentBuilder().startObject() return new BytesRestResponse(status, channel.newErrorBuilder().startObject()
.field("error", errorMessage) .field("error", errorMessage)
.field("status", status.getStatus()) .field("status", status.getStatus())
.endObject()); .endObject());

View File

@ -178,8 +178,9 @@ public class RestController extends AbstractComponent implements HttpServerTrans
sendContentTypeErrorMessage(request, responseChannel); sendContentTypeErrorMessage(request, responseChannel);
} else if (contentLength > 0 && handler != null && handler.supportsContentStream() && } else if (contentLength > 0 && handler != null && handler.supportsContentStream() &&
request.getXContentType() != XContentType.JSON && request.getXContentType() != XContentType.SMILE) { request.getXContentType() != XContentType.JSON && request.getXContentType() != XContentType.SMILE) {
responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(RestStatus.NOT_ACCEPTABLE, "Content-Type [" + responseChannel.sendResponse(BytesRestResponse.createSimpleErrorResponse(responseChannel,
request.getXContentType() + "] does not support stream parsing. Use JSON or SMILE instead")); RestStatus.NOT_ACCEPTABLE, "Content-Type [" + request.getXContentType() +
"] does not support stream parsing. Use JSON or SMILE instead"));
} else { } else {
if (canTripCircuitBreaker(request)) { if (canTripCircuitBreaker(request)) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>"); inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
@ -229,7 +230,8 @@ public class RestController extends AbstractComponent implements HttpServerTrans
void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext, void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext,
final RestHandler handler) throws Exception { final RestHandler handler) throws Exception {
if (checkRequestParameters(request, channel) == false) { if (checkRequestParameters(request, channel) == false) {
channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(BAD_REQUEST, "error traces in responses are disabled.")); channel
.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel,BAD_REQUEST, "error traces in responses are disabled."));
} else { } else {
for (String key : headersToCopy) { for (String key : headersToCopy) {
String httpHeader = request.header(key); String httpHeader = request.header(key);
@ -283,7 +285,7 @@ public class RestController extends AbstractComponent implements HttpServerTrans
Strings.collectionToCommaDelimitedString(restRequest.getAllHeaderValues("Content-Type")) + "] is not supported"; Strings.collectionToCommaDelimitedString(restRequest.getAllHeaderValues("Content-Type")) + "] is not supported";
} }
channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(NOT_ACCEPTABLE, errorMessage)); channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, NOT_ACCEPTABLE, errorMessage));
} }
/** /**

View File

@ -41,7 +41,7 @@ import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotCompressedException; import org.elasticsearch.common.compress.NotCompressedException;
import org.elasticsearch.common.io.ReleasableBytesStream; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -1025,10 +1025,8 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
} }
status = TransportStatus.setRequest(status); status = TransportStatus.setRequest(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
// we wrap this in a release once since if the onRequestSent callback throws an exception boolean addedReleaseListener = false;
// we might release things twice and this should be prevented StreamOutput stream = Streams.flushOnCloseStream(bStream);
final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes()));
StreamOutput stream = bStream;
try { try {
// only compress if asked, and, the request is not bytes, since then only // only compress if asked, and, the request is not bytes, since then only
// the header part is compressed, and the "body" can't be extracted as compressed // the header part is compressed, and the "body" can't be extracted as compressed
@ -1047,12 +1045,17 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
stream.writeString(action); stream.writeString(action);
BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream, bStream); BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream, bStream);
final TransportRequestOptions finalOptions = options; final TransportRequestOptions finalOptions = options;
final StreamOutput finalStream = stream;
// this might be called in a different thread // this might be called in a different thread
SendListener onRequestSent = new SendListener(toRelease, SendListener onRequestSent = new SendListener(
() -> IOUtils.closeWhileHandlingException(finalStream, bStream),
() -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions)); () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions));
internalSendMessage(targetChannel, message, onRequestSent); internalSendMessage(targetChannel, message, onRequestSent);
addedReleaseListener = true;
} finally { } finally {
IOUtils.close(stream); if (!addedReleaseListener) {
IOUtils.close(stream, bStream);
}
} }
} }
@ -1114,10 +1117,8 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
} }
status = TransportStatus.setResponse(status); // TODO share some code with sendRequest status = TransportStatus.setResponse(status); // TODO share some code with sendRequest
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
// we wrap this in a release once since if the onRequestSent callback throws an exception boolean addedReleaseListener = false;
// we might release things twice and this should be prevented StreamOutput stream = Streams.flushOnCloseStream(bStream);
final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes()));
StreamOutput stream = bStream;
try { try {
if (options.compress()) { if (options.compress()) {
status = TransportStatus.setCompress(status); status = TransportStatus.setCompress(status);
@ -1128,12 +1129,16 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream, bStream); BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream, bStream);
final TransportResponseOptions finalOptions = options; final TransportResponseOptions finalOptions = options;
final StreamOutput finalStream = stream;
// this might be called in a different thread // this might be called in a different thread
SendListener listener = new SendListener(toRelease, SendListener listener = new SendListener(() -> IOUtils.closeWhileHandlingException(finalStream, bStream),
() -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions)); () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions));
internalSendMessage(channel, reference, listener); internalSendMessage(channel, reference, listener);
addedReleaseListener = true;
} finally { } finally {
IOUtils.close(stream); if (!addedReleaseListener) {
IOUtils.close(stream, bStream);
}
} }
} }
@ -1161,7 +1166,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
* Serializes the given message into a bytes representation * Serializes the given message into a bytes representation
*/ */
private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message, StreamOutput stream, private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message, StreamOutput stream,
ReleasableBytesStream writtenBytes) throws IOException { ReleasableBytesStreamOutput writtenBytes) throws IOException {
final BytesReference zeroCopyBuffer; final BytesReference zeroCopyBuffer;
if (message instanceof BytesTransportRequest) { // what a shitty optimization - we should use a direct send method instead if (message instanceof BytesTransportRequest) { // what a shitty optimization - we should use a direct send method instead
BytesTransportRequest bRequest = (BytesTransportRequest) message; BytesTransportRequest bRequest = (BytesTransportRequest) message;

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
public class ReleasableBytesStreamOutputTests extends ESTestCase {
public void testRelease() throws Exception {
MockBigArrays mockBigArrays =
new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
try (ReleasableBytesStreamOutput output =
getRandomReleasableBytesStreamOutput(mockBigArrays)) {
output.writeBoolean(randomBoolean());
}
MockBigArrays.ensureAllArraysAreReleased();
}
private ReleasableBytesStreamOutput getRandomReleasableBytesStreamOutput(
MockBigArrays mockBigArrays) throws IOException {
ReleasableBytesStreamOutput output = new ReleasableBytesStreamOutput(mockBigArrays);
if (randomBoolean()) {
for (int i = 0; i < scaledRandomIntBetween(1, 32); i++) {
output.write(randomByte());
}
}
return output;
}
}

View File

@ -85,7 +85,7 @@ final class Netty4HttpChannel extends AbstractRestChannel {
} }
@Override @Override
public BytesStreamOutput newBytesOutput() { protected BytesStreamOutput newBytesOutput() {
return new ReleasableBytesStreamOutput(transport.bigArrays); return new ReleasableBytesStreamOutput(transport.bigArrays);
} }
@ -114,7 +114,8 @@ final class Netty4HttpChannel extends AbstractRestChannel {
addCustomHeaders(resp, threadContext.getResponseHeaders()); addCustomHeaders(resp, threadContext.getResponseHeaders());
BytesReference content = response.content(); BytesReference content = response.content();
boolean release = content instanceof Releasable; boolean releaseContent = content instanceof Releasable;
boolean releaseBytesStreamOutput = bytesOutputOrNull() instanceof ReleasableBytesStreamOutput;
try { try {
// If our response doesn't specify a content-type header, set one // If our response doesn't specify a content-type header, set one
setHeaderField(resp, HttpHeaderNames.CONTENT_TYPE.toString(), response.contentType(), false); setHeaderField(resp, HttpHeaderNames.CONTENT_TYPE.toString(), response.contentType(), false);
@ -125,10 +126,14 @@ final class Netty4HttpChannel extends AbstractRestChannel {
final ChannelPromise promise = channel.newPromise(); final ChannelPromise promise = channel.newPromise();
if (release) { if (releaseContent) {
promise.addListener(f -> ((Releasable)content).close()); promise.addListener(f -> ((Releasable)content).close());
} }
if (releaseBytesStreamOutput) {
promise.addListener(f -> bytesOutputOrNull().close());
}
if (isCloseConnection()) { if (isCloseConnection()) {
promise.addListener(ChannelFutureListener.CLOSE); promise.addListener(ChannelFutureListener.CLOSE);
} }
@ -140,11 +145,15 @@ final class Netty4HttpChannel extends AbstractRestChannel {
msg = resp; msg = resp;
} }
channel.writeAndFlush(msg, promise); channel.writeAndFlush(msg, promise);
release = false; releaseContent = false;
releaseBytesStreamOutput = false;
} finally { } finally {
if (release) { if (releaseContent) {
((Releasable) content).close(); ((Releasable) content).close();
} }
if (releaseBytesStreamOutput) {
bytesOutputOrNull().close();
}
if (pipelinedRequest != null) { if (pipelinedRequest != null) {
pipelinedRequest.release(); pipelinedRequest.release();
} }

View File

@ -43,18 +43,24 @@ import io.netty.util.Attribute;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.http.NullDispatcher;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest; import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -64,6 +70,7 @@ import org.elasticsearch.transport.netty4.Netty4Utils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -78,6 +85,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ENABLED;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
@ -243,6 +251,37 @@ public class Netty4HttpChannelTests extends ESTestCase {
} }
} }
public void testReleaseOnSendToChannelAfterException() throws IOException {
final Settings settings = Settings.builder().build();
final NamedXContentRegistry registry = xContentRegistry();
try (Netty4HttpServerTransport httpServerTransport =
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, registry, new NullDispatcher())) {
final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
final EmbeddedChannel embeddedChannel = new EmbeddedChannel();
final Netty4HttpRequest request = new Netty4HttpRequest(registry, httpRequest, embeddedChannel);
final HttpPipelinedRequest pipelinedRequest = randomBoolean() ? new HttpPipelinedRequest(request.request(), 1) : null;
final Netty4HttpChannel channel =
new Netty4HttpChannel(httpServerTransport, request, pipelinedRequest, randomBoolean(), threadPool.getThreadContext());
final BytesRestResponse response = new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
JsonXContent.contentBuilder().startObject().endObject());
assertThat(response.content(), not(instanceOf(Releasable.class)));
// ensure we have reserved bytes
if (randomBoolean()) {
BytesStreamOutput out = channel.bytesOutput();
assertThat(out, instanceOf(ReleasableBytesStreamOutput.class));
} else {
try (XContentBuilder builder = channel.newBuilder()) {
// do something builder
builder.startObject().endObject();
}
}
channel.sendResponse(response);
// ESTestCase#after will invoke ensureAllArraysAreReleased which will fail if the response content was not released
}
}
public void testConnectionClose() throws Exception { public void testConnectionClose() throws Exception {
final Settings settings = Settings.builder().build(); final Settings settings = Settings.builder().build();
try (Netty4HttpServerTransport httpServerTransport = try (Netty4HttpServerTransport httpServerTransport =
@ -549,7 +588,7 @@ public class Netty4HttpChannelTests extends ESTestCase {
} }
final ByteArray bigArray = bigArrays.newByteArray(bytes.length); final ByteArray bigArray = bigArrays.newByteArray(bytes.length);
bigArray.set(0, bytes, 0, bytes.length); bigArray.set(0, bytes, 0, bytes.length);
reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length); reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length, Releasables.releaseOnce(bigArray));
} }
@Override @Override