NIFI-13647 Removed Socket Close from ListenOTLP Error Responses (#9166)

This commit is contained in:
David Handermann 2024-08-09 09:00:34 -05:00 committed by GitHub
parent 4f262dc7ea
commit 920edfc022
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 6 additions and 9 deletions

View File

@ -17,8 +17,6 @@
package org.apache.nifi.processors.opentelemetry.server; package org.apache.nifi.processors.opentelemetry.server;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultFullHttpResponse;
@ -87,7 +85,7 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequ
if (HttpMethod.POST == httpRequest.method()) { if (HttpMethod.POST == httpRequest.method()) {
handleHttpPostRequest(channelHandlerContext, httpRequest); handleHttpPostRequest(channelHandlerContext, httpRequest);
} else { } else {
sendCloseResponse(channelHandlerContext, httpRequest, HttpResponseStatus.METHOD_NOT_ALLOWED); sendErrorResponse(channelHandlerContext, httpRequest, HttpResponseStatus.METHOD_NOT_ALLOWED);
} }
} }
@ -96,12 +94,12 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequ
final String requestContentType = headers.get(HttpHeaderNames.CONTENT_TYPE); final String requestContentType = headers.get(HttpHeaderNames.CONTENT_TYPE);
final TelemetryContentType telemetryContentType = getTelemetryContentType(requestContentType); final TelemetryContentType telemetryContentType = getTelemetryContentType(requestContentType);
if (telemetryContentType == null) { if (telemetryContentType == null) {
sendCloseResponse(channelHandlerContext, httpRequest, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE); sendErrorResponse(channelHandlerContext, httpRequest, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE);
} else { } else {
final String uri = httpRequest.uri(); final String uri = httpRequest.uri();
final TelemetryRequestType telemetryRequestType = getTelemetryRequestType(uri, telemetryContentType); final TelemetryRequestType telemetryRequestType = getTelemetryRequestType(uri, telemetryContentType);
if (telemetryRequestType == null) { if (telemetryRequestType == null) {
sendCloseResponse(channelHandlerContext, httpRequest, HttpResponseStatus.NOT_FOUND); sendErrorResponse(channelHandlerContext, httpRequest, HttpResponseStatus.NOT_FOUND);
} else { } else {
handleHttpPostRequestTypeSupported(channelHandlerContext, httpRequest, telemetryRequestType, telemetryContentType); handleHttpPostRequestTypeSupported(channelHandlerContext, httpRequest, telemetryRequestType, telemetryContentType);
} }
@ -204,19 +202,18 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequ
return grpcStatusCode; return grpcStatusCode;
} }
private void sendCloseResponse(final ChannelHandlerContext channelHandlerContext, final HttpRequest httpRequest, final HttpResponseStatus httpResponseStatus) { private void sendErrorResponse(final ChannelHandlerContext channelHandlerContext, final HttpRequest httpRequest, final HttpResponseStatus httpResponseStatus) {
final SocketAddress remoteAddress = channelHandlerContext.channel().remoteAddress(); final SocketAddress remoteAddress = channelHandlerContext.channel().remoteAddress();
final HttpMethod method = httpRequest.method(); final HttpMethod method = httpRequest.method();
final String uri = httpRequest.uri(); final String uri = httpRequest.uri();
final HttpVersion httpVersion = httpRequest.protocolVersion(); final HttpVersion httpVersion = httpRequest.protocolVersion();
log.debug("HTTP Request Closed: Client Address [{}] Method [{}] URI [{}] Version [{}] HTTP {}", remoteAddress, method, uri, httpVersion, httpResponseStatus.code()); log.debug("HTTP Request Failed: Client Address [{}] Method [{}] URI [{}] Version [{}] HTTP {}", remoteAddress, method, uri, httpVersion, httpResponseStatus.code());
final FullHttpResponse response = new DefaultFullHttpResponse(httpVersion, httpResponseStatus); final FullHttpResponse response = new DefaultFullHttpResponse(httpVersion, httpResponseStatus);
setStreamId(httpRequest.headers(), response); setStreamId(httpRequest.headers(), response);
final ChannelFuture future = channelHandlerContext.writeAndFlush(response); channelHandlerContext.writeAndFlush(response);
future.addListener(ChannelFutureListener.CLOSE);
} }
private void sendResponse( private void sendResponse(