NIFI-13807 Fixed Stream Handling in Runtime Management Server HTTP Exchanges (#9316)

- Drained request InputStream
- Added try-with-resources for response OutputStream

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2024-09-26 11:57:37 -04:00 committed by GitHub
parent 1548a21219
commit f39cce5dec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 102 additions and 50 deletions

View File

@ -22,6 +22,7 @@ import org.apache.nifi.NiFiServer;
import org.apache.nifi.cluster.ClusterDetailsFactory;
import org.apache.nifi.cluster.ConnectionState;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.util.HttpExchangeUtils;
import java.io.IOException;
import java.io.OutputStream;
@ -57,28 +58,30 @@ class HealthClusterHttpHandler implements HttpHandler {
@Override
public void handle(final HttpExchange exchange) throws IOException {
HttpExchangeUtils.drainRequestBody(exchange);
final String requestMethod = exchange.getRequestMethod();
final OutputStream responseBody = exchange.getResponseBody();
try (final OutputStream responseBody = exchange.getResponseBody()) {
if (GET_METHOD.contentEquals(requestMethod)) {
exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN);
final ConnectionState connectionState = getConnectionState();
final String status = STATUS.formatted(connectionState);
final byte[] response = status.getBytes(StandardCharsets.UTF_8);
final int responseCode = getResponseCode(connectionState);
exchange.sendResponseHeaders(responseCode, response.length);
responseBody.write(response);
} else if (DELETE_METHOD.contentEquals(requestMethod)) {
startDecommission();
if (GET_METHOD.contentEquals(requestMethod)) {
exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN);
final ConnectionState connectionState = getConnectionState();
final String status = STATUS.formatted(connectionState);
final byte[] response = status.getBytes(StandardCharsets.UTF_8);
final int responseCode = getResponseCode(connectionState);
exchange.sendResponseHeaders(responseCode, response.length);
responseBody.write(response);
} else if (DELETE_METHOD.contentEquals(requestMethod)) {
startDecommission();
exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN);
final String status = STATUS.formatted(ConnectionState.OFFLOADING);
final byte[] response = status.getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(HTTP_ACCEPTED, response.length);
responseBody.write(response);
} else {
exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY);
exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN);
final String status = STATUS.formatted(ConnectionState.OFFLOADING);
final byte[] response = status.getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(HTTP_ACCEPTED, response.length);
responseBody.write(response);
} else {
exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY);
}
}
}

View File

@ -21,6 +21,7 @@ import com.sun.net.httpserver.HttpHandler;
import org.apache.nifi.NiFiServer;
import org.apache.nifi.diagnostics.DiagnosticsDump;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
import org.apache.nifi.util.HttpExchangeUtils;
import java.io.IOException;
import java.io.OutputStream;
@ -54,22 +55,23 @@ class HealthDiagnosticsHttpHandler implements HttpHandler {
@Override
public void handle(final HttpExchange exchange) throws IOException {
HttpExchangeUtils.drainRequestBody(exchange);
final String requestMethod = exchange.getRequestMethod();
try (final OutputStream responseBody = exchange.getResponseBody()) {
if (GET_METHOD.contentEquals(requestMethod)) {
exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN);
exchange.sendResponseHeaders(HTTP_OK, STREAM_RESPONSE_BODY);
if (GET_METHOD.contentEquals(requestMethod)) {
exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN);
exchange.sendResponseHeaders(HTTP_OK, STREAM_RESPONSE_BODY);
final URI requestUri = exchange.getRequestURI();
final boolean verboseRequested = getVerboseRequested(requestUri);
final URI requestUri = exchange.getRequestURI();
final boolean verboseRequested = getVerboseRequested(requestUri);
final DiagnosticsFactory diagnosticsFactory = server.getDiagnosticsFactory();
final DiagnosticsDump diagnosticsDump = diagnosticsFactory.create(verboseRequested);
try (OutputStream responseBody = exchange.getResponseBody()) {
final DiagnosticsFactory diagnosticsFactory = server.getDiagnosticsFactory();
final DiagnosticsDump diagnosticsDump = diagnosticsFactory.create(verboseRequested);
diagnosticsDump.writeTo(responseBody);
} else {
exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY);
}
} else {
exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY);
}
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.runtime;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.apache.nifi.util.HttpExchangeUtils;
import java.io.IOException;
import java.io.OutputStream;
@ -42,17 +43,19 @@ class HealthHttpHandler implements HttpHandler {
@Override
public void handle(final HttpExchange exchange) throws IOException {
HttpExchangeUtils.drainRequestBody(exchange);
final String requestMethod = exchange.getRequestMethod();
final OutputStream responseBody = exchange.getResponseBody();
if (GET_METHOD.contentEquals(requestMethod)) {
exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN);
final byte[] response = STATUS_UP.getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(HTTP_OK, response.length);
responseBody.write(response);
} else {
exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY);
try (final OutputStream responseBody = exchange.getResponseBody()) {
if (GET_METHOD.contentEquals(requestMethod)) {
exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN);
final byte[] response = STATUS_UP.getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(HTTP_OK, response.length);
responseBody.write(response);
} else {
exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY);
}
}
}
}

View File

@ -21,6 +21,7 @@ import com.sun.net.httpserver.HttpHandler;
import org.apache.nifi.NiFiServer;
import org.apache.nifi.controller.status.history.StatusHistoryDump;
import org.apache.nifi.controller.status.history.StatusHistoryDumpFactory;
import org.apache.nifi.util.HttpExchangeUtils;
import java.io.IOException;
import java.io.OutputStream;
@ -60,23 +61,24 @@ class HealthStatusHistoryHttpHandler implements HttpHandler {
@Override
public void handle(final HttpExchange exchange) throws IOException {
HttpExchangeUtils.drainRequestBody(exchange);
final String requestMethod = exchange.getRequestMethod();
if (GET_METHOD.contentEquals(requestMethod)) {
exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, APPLICATION_JSON);
exchange.sendResponseHeaders(HTTP_OK, STREAM_RESPONSE_BODY);
try (final OutputStream responseBody = exchange.getResponseBody()) {
if (GET_METHOD.contentEquals(requestMethod)) {
exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, APPLICATION_JSON);
exchange.sendResponseHeaders(HTTP_OK, STREAM_RESPONSE_BODY);
final URI requestUri = exchange.getRequestURI();
final int daysRequested = getDaysRequested(requestUri);
final URI requestUri = exchange.getRequestURI();
final int daysRequested = getDaysRequested(requestUri);
final StatusHistoryDumpFactory statusHistoryDumpFactory = server.getStatusHistoryDumpFactory();
final StatusHistoryDump statusHistoryDump = statusHistoryDumpFactory.create(daysRequested);
try (OutputStream responseBody = exchange.getResponseBody()) {
final StatusHistoryDumpFactory statusHistoryDumpFactory = server.getStatusHistoryDumpFactory();
final StatusHistoryDump statusHistoryDump = statusHistoryDumpFactory.create(daysRequested);
statusHistoryDump.writeTo(responseBody);
} else {
exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY);
}
} else {
exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import com.sun.net.httpserver.HttpExchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
public class HttpExchangeUtils {
private static final Logger logger = LoggerFactory.getLogger(HttpExchangeUtils.class);
public static void drainRequestBody(final HttpExchange exchange) {
final byte[] buffer = new byte[4096];
try (final InputStream in = exchange.getRequestBody()) {
while ((in.read(buffer)) != -1) {
// Ignore the data read, just drain the input stream
}
} catch (final IOException ioe) {
// Since we don't actually care about the contents of the input, we will ignore any Exceptions when reading from it.
logger.debug("Failed to fully drain HttpExchange InputStream from {}", exchange.getRequestURI(), ioe);
}
}
}