Bubble exceptions when closing compressible streams

Compressible bytes output stream swallows exceptions that occur when
closing. This commit changes this behavior so that such exceptions
bubble up.

Relates #27542
This commit is contained in:
Jason Tedor 2017-11-27 13:48:04 -05:00 committed by GitHub
parent f23ed6188d
commit 379d51fcfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 12 deletions

View File

@ -43,7 +43,7 @@ import java.util.zip.DeflaterOutputStream;
* {@link CompressibleBytesOutputStream#close()} should be called when the bytes are no longer needed and * {@link CompressibleBytesOutputStream#close()} should be called when the bytes are no longer needed and
* can be safely released. * can be safely released.
*/ */
final class CompressibleBytesOutputStream extends StreamOutput implements Releasable { final class CompressibleBytesOutputStream extends StreamOutput {
private final StreamOutput stream; private final StreamOutput stream;
private final BytesStream bytesStreamOutput; private final BytesStream bytesStreamOutput;
@ -92,13 +92,13 @@ final class CompressibleBytesOutputStream extends StreamOutput implements Releas
} }
@Override @Override
public void close() { public void close() throws IOException {
if (stream == bytesStreamOutput) { if (stream == bytesStreamOutput) {
assert shouldCompress == false : "If the streams are the same we should not be compressing"; assert shouldCompress == false : "If the streams are the same we should not be compressing";
IOUtils.closeWhileHandlingException(stream); IOUtils.close(stream);
} else { } else {
assert shouldCompress : "If the streams are different we should be compressing"; assert shouldCompress : "If the streams are different we should be compressing";
IOUtils.closeWhileHandlingException(stream, bytesStreamOutput); IOUtils.close(stream, bytesStreamOutput);
} }
} }

View File

@ -49,7 +49,6 @@ import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkAddress;
@ -73,8 +72,10 @@ import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.StreamCorruptedException; import java.io.StreamCorruptedException;
import java.io.UncheckedIOException;
import java.net.BindException; import java.net.BindException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -1704,29 +1705,36 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final class SendListener extends SendMetricListener { private final class SendListener extends SendMetricListener {
private final TcpChannel channel; private final TcpChannel channel;
private final Releasable optionalReleasable; private final Closeable optionalCloseable;
private final Runnable transportAdaptorCallback; private final Runnable transportAdaptorCallback;
private SendListener(TcpChannel channel, Releasable optionalReleasable, Runnable transportAdaptorCallback, long messageLength) { private SendListener(TcpChannel channel, Closeable optionalCloseable, Runnable transportAdaptorCallback, long messageLength) {
super(messageLength); super(messageLength);
this.channel = channel; this.channel = channel;
this.optionalReleasable = optionalReleasable; this.optionalCloseable = optionalCloseable;
this.transportAdaptorCallback = transportAdaptorCallback; this.transportAdaptorCallback = transportAdaptorCallback;
} }
@Override @Override
protected void innerInnerOnResponse(Void v) { protected void innerInnerOnResponse(Void v) {
release(); closeAndCallback(null);
} }
@Override @Override
protected void innerOnFailure(Exception e) { protected void innerOnFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e); logger.warn(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e);
release(); closeAndCallback(e);
} }
private void release() { private void closeAndCallback(final Exception e) {
Releasables.close(optionalReleasable, transportAdaptorCallback::run); try {
IOUtils.close(optionalCloseable, transportAdaptorCallback::run);
} catch (final IOException inner) {
if (e != null) {
inner.addSuppressed(e);
}
throw new UncheckedIOException(inner);
}
} }
} }