Ensure that releasing listener is called

When sending a response to a client, we attach a releasing listener to
the channel promise. If the client disappears before the response is
sent, the releasing listener was never notified. The reason the
listeners were never notified was due to a mistaken invocation of write
and flush on the channel which has two overrides: one that takes an
existing promise, and one that does not and instead creates a new
promise. When the client disappears, it is this latter promise that is
notified, which does not contain the releasing listener. This commit
addreses this issue by invoking the override that passes our channel
promise through.

Relates #23310
This commit is contained in:
Jason Tedor 2017-02-22 13:54:17 -05:00 committed by GitHub
parent 6f1ed8a3d1
commit 708d11f54a
8 changed files with 58 additions and 31 deletions

View File

@ -127,19 +127,20 @@ final class Netty4HttpChannel extends AbstractRestChannel {
if (release) {
promise.addListener(f -> ((Releasable)content).close());
release = false;
}
if (isCloseConnection()) {
promise.addListener(ChannelFutureListener.CLOSE);
}
final Object msg;
if (pipelinedRequest != null) {
channel.writeAndFlush(pipelinedRequest.createHttpResponse(resp, promise));
msg = pipelinedRequest.createHttpResponse(resp);
} else {
channel.writeAndFlush(resp, promise);
msg = resp;
}
channel.writeAndFlush(msg, promise);
release = false;
} finally {
if (release) {
((Releasable) content).close();

View File

@ -69,7 +69,6 @@ import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.http.netty4.pipelining.HttpPipeliningHandler;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;

View File

@ -19,10 +19,7 @@
package org.elasticsearch.http.netty4.pipelining;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCounted;
@ -36,7 +33,7 @@ public class HttpPipelinedRequest implements ReferenceCounted {
private final int sequence;
HttpPipelinedRequest(final LastHttpContent last, final int sequence) {
public HttpPipelinedRequest(final LastHttpContent last, final int sequence) {
this.last = last;
this.sequence = sequence;
}
@ -45,8 +42,8 @@ public class HttpPipelinedRequest implements ReferenceCounted {
return last;
}
public HttpPipelinedResponse createHttpResponse(final FullHttpResponse response, final ChannelPromise promise) {
return new HttpPipelinedResponse(response, promise, sequence);
public HttpPipelinedResponse createHttpResponse(final FullHttpResponse response) {
return new HttpPipelinedResponse(response, sequence);
}
@Override

View File

@ -19,20 +19,16 @@ package org.elasticsearch.http.netty4.pipelining;
* under the License.
*/
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.ReferenceCounted;
class HttpPipelinedResponse implements Comparable<HttpPipelinedResponse>, ReferenceCounted {
private final FullHttpResponse response;
private final ChannelPromise promise;
private final int sequence;
HttpPipelinedResponse(FullHttpResponse response, ChannelPromise promise, int sequence) {
HttpPipelinedResponse(FullHttpResponse response, int sequence) {
this.response = response;
this.promise = promise;
this.sequence = sequence;
}
@ -40,10 +36,6 @@ class HttpPipelinedResponse implements Comparable<HttpPipelinedResponse>, Refere
return response;
}
public ChannelPromise promise() {
return promise;
}
public int sequence() {
return sequence;
}

View File

@ -24,9 +24,6 @@ import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import org.elasticsearch.action.termvectors.TermVectorsFilter;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.util.Collections;
@ -84,7 +81,7 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler {
break;
}
holdingQueue.remove();
ctx.write(response.response(), response.promise());
ctx.write(response.response(), promise);
writeSequence++;
}
} else {

View File

@ -41,14 +41,19 @@ import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.http.NullDispatcher;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
@ -59,6 +64,7 @@ import org.elasticsearch.transport.netty4.Netty4Utils;
import org.junit.After;
import org.junit.Before;
import java.io.UnsupportedEncodingException;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -70,6 +76,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ME
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ENABLED;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -217,6 +224,25 @@ public class Netty4HttpChannelTests extends ESTestCase {
}
}
public void testReleaseOnSendToClosedChannel() {
final Settings settings = Settings.builder().build();
final NamedXContentRegistry registry = xContentRegistry();
try (Netty4HttpServerTransport httpServerTransport =
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, registry, new NullDispatcher())) {
final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
final EmbeddedChannel embeddedChannel = new EmbeddedChannel();
final Netty4HttpRequest request = new Netty4HttpRequest(registry, httpRequest, embeddedChannel);
final HttpPipelinedRequest pipelinedRequest = randomBoolean() ? new HttpPipelinedRequest(request.request(), 1) : null;
final Netty4HttpChannel channel =
new Netty4HttpChannel(httpServerTransport, request, pipelinedRequest, randomBoolean(), threadPool.getThreadContext());
final TestResponse response = new TestResponse(bigArrays);
assertThat(response.content(), instanceOf(Releasable.class));
embeddedChannel.close();
channel.sendResponse(response);
// ESTestCase#after will invoke ensureAllArraysAreReleased which will fail if the response content was not released
}
}
public void testConnectionClose() throws Exception {
final Settings settings = Settings.builder().build();
try (Netty4HttpServerTransport httpServerTransport =
@ -508,6 +534,24 @@ public class Netty4HttpChannelTests extends ESTestCase {
private static class TestResponse extends RestResponse {
private final BytesReference reference;
TestResponse() {
reference = Netty4Utils.toBytesReference(Unpooled.copiedBuffer("content", StandardCharsets.UTF_8));
}
TestResponse(final BigArrays bigArrays) {
final byte[] bytes;
try {
bytes = "content".getBytes("UTF-8");
} catch (final UnsupportedEncodingException e) {
throw new AssertionError(e);
}
final ByteArray bigArray = bigArrays.newByteArray(bytes.length);
bigArray.set(0, bytes, 0, bytes.length);
reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length);
}
@Override
public String contentType() {
return "text";
@ -515,7 +559,7 @@ public class Netty4HttpChannelTests extends ESTestCase {
@Override
public BytesReference content() {
return Netty4Utils.toBytesReference(Unpooled.copiedBuffer("content", StandardCharsets.UTF_8));
return reference;
}
@Override

View File

@ -256,7 +256,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
}
if (pipelinedRequest != null) {
ctx.writeAndFlush(pipelinedRequest.createHttpResponse(httpResponse, ctx.channel().newPromise()));
ctx.writeAndFlush(pipelinedRequest.createHttpResponse(httpResponse));
} else {
ctx.writeAndFlush(httpResponse);
}

View File

@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
@ -37,7 +36,6 @@ import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.QueryStringDecoder;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
@ -248,8 +246,7 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
executorService.submit(() -> {
try {
waitingLatch.await(1000, TimeUnit.SECONDS);
final ChannelPromise promise = ctx.newPromise();
ctx.write(pipelinedRequest.createHttpResponse(httpResponse, promise), promise);
ctx.write(pipelinedRequest.createHttpResponse(httpResponse), ctx.newPromise());
finishingLatch.countDown();
} catch (InterruptedException e) {
fail(e.toString());