From a0c9bebe241e8960363490cc0378f22d034f8db7 Mon Sep 17 00:00:00 2001 From: Joe Percivall Date: Mon, 16 Apr 2018 22:36:26 -0400 Subject: [PATCH] NIFI-5085 In InvokeHttp, moving the OkHttp Response object to a try with resources --- .../nifi/processors/standard/InvokeHTTP.java | 218 +++++++++--------- 1 file changed, 110 insertions(+), 108 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java index 834dd9abf6..7ec6a877d4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java @@ -779,133 +779,135 @@ public final class InvokeHTTP extends AbstractProcessor { } final long startNanos = System.nanoTime(); - Response responseHttp = okHttpClient.newCall(httpRequest).execute(); - // output the raw response headers (DEBUG level only) - logResponse(logger, url, responseHttp); + try (Response responseHttp = okHttpClient.newCall(httpRequest).execute()) { + // output the raw response headers (DEBUG level only) + logResponse(logger, url, responseHttp); - // store the status code and message - int statusCode = responseHttp.code(); - String statusMessage = responseHttp.message(); + // store the status code and message + int statusCode = responseHttp.code(); + String statusMessage = responseHttp.message(); - if (statusCode == 0) { - throw new IllegalStateException("Status code unknown, connection hasn't been attempted."); - } - - // Create a map of the status attributes that are always written to the request and response FlowFiles - Map statusAttributes = new HashMap<>(); - statusAttributes.put(STATUS_CODE, String.valueOf(statusCode)); - statusAttributes.put(STATUS_MESSAGE, statusMessage); - statusAttributes.put(REQUEST_URL, url.toExternalForm()); - statusAttributes.put(TRANSACTION_ID, txId.toString()); - - if (requestFlowFile != null) { - requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes); - } - - // If the property to add the response headers to the request flowfile is true then add them - if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean() && requestFlowFile != null) { - // write the response headers as attributes - // this will overwrite any existing flowfile attributes - requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(url, responseHttp)); - } - - boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null; - boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean(); - ResponseBody responseBody = responseHttp.body(); - boolean bodyExists = responseBody != null; - - InputStream responseBodyStream = null; - SoftLimitBoundedByteArrayOutputStream outputStreamToRequestAttribute = null; - TeeInputStream teeInputStream = null; - try { - responseBodyStream = bodyExists ? responseBody.byteStream() : null; - if (responseBodyStream != null && outputBodyToRequestAttribute && outputBodyToResponseContent) { - outputStreamToRequestAttribute = new SoftLimitBoundedByteArrayOutputStream(maxAttributeSize); - teeInputStream = new TeeInputStream(responseBodyStream, outputStreamToRequestAttribute); + if (statusCode == 0) { + throw new IllegalStateException("Status code unknown, connection hasn't been attempted."); } - if (outputBodyToResponseContent) { - /* - * If successful and putting to response flowfile, store the response body as the flowfile payload - * we include additional flowfile attributes including the response headers and the status codes. - */ + // Create a map of the status attributes that are always written to the request and response FlowFiles + Map statusAttributes = new HashMap<>(); + statusAttributes.put(STATUS_CODE, String.valueOf(statusCode)); + statusAttributes.put(STATUS_MESSAGE, statusMessage); + statusAttributes.put(REQUEST_URL, url.toExternalForm()); + statusAttributes.put(TRANSACTION_ID, txId.toString()); - // clone the flowfile to capture the response - if (requestFlowFile != null) { - responseFlowFile = session.create(requestFlowFile); - } else { - responseFlowFile = session.create(); - } - - // write attributes to response flowfile - responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes); + if (requestFlowFile != null) { + requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes); + } + // If the property to add the response headers to the request flowfile is true then add them + if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean() && requestFlowFile != null) { // write the response headers as attributes // this will overwrite any existing flowfile attributes - responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(url, responseHttp)); + requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(url, responseHttp)); + } - // transfer the message body to the payload - // can potentially be null in edge cases - if (bodyExists) { - // write content type attribute to response flowfile if it is available - if (responseBody.contentType() != null) { - responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), responseBody.contentType().toString()); - } - if (teeInputStream != null) { - responseFlowFile = session.importFrom(teeInputStream, responseFlowFile); + boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null; + boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean(); + ResponseBody responseBody = responseHttp.body(); + boolean bodyExists = responseBody != null; + + InputStream responseBodyStream = null; + SoftLimitBoundedByteArrayOutputStream outputStreamToRequestAttribute = null; + TeeInputStream teeInputStream = null; + try { + responseBodyStream = bodyExists ? responseBody.byteStream() : null; + if (responseBodyStream != null && outputBodyToRequestAttribute && outputBodyToResponseContent) { + outputStreamToRequestAttribute = new SoftLimitBoundedByteArrayOutputStream(maxAttributeSize); + teeInputStream = new TeeInputStream(responseBodyStream, outputStreamToRequestAttribute); + } + + if (outputBodyToResponseContent) { + /* + * If successful and putting to response flowfile, store the response body as the flowfile payload + * we include additional flowfile attributes including the response headers and the status codes. + */ + + // clone the flowfile to capture the response + if (requestFlowFile != null) { + responseFlowFile = session.create(requestFlowFile); } else { - responseFlowFile = session.importFrom(responseBodyStream, responseFlowFile); + responseFlowFile = session.create(); } - // emit provenance event + // write attributes to response flowfile + responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes); + + // write the response headers as attributes + // this will overwrite any existing flowfile attributes + responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(url, responseHttp)); + + // transfer the message body to the payload + // can potentially be null in edge cases + if (bodyExists) { + // write content type attribute to response flowfile if it is available + if (responseBody.contentType() != null) { + responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), responseBody.contentType().toString()); + } + if (teeInputStream != null) { + responseFlowFile = session.importFrom(teeInputStream, responseFlowFile); + } else { + responseFlowFile = session.importFrom(responseBodyStream, responseFlowFile); + } + + // emit provenance event + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + if(requestFlowFile != null) { + session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis); + } else { + session.getProvenanceReporter().receive(responseFlowFile, url.toExternalForm(), millis); + } + } + } + + // if not successful and request flowfile is not null, store the response body into a flowfile attribute + if (outputBodyToRequestAttribute && bodyExists) { + String attributeKey = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).evaluateAttributeExpressions(requestFlowFile).getValue(); + if (attributeKey == null) { + attributeKey = RESPONSE_BODY; + } + byte[] outputBuffer; + int size; + + if (outputStreamToRequestAttribute != null) { + outputBuffer = outputStreamToRequestAttribute.getBuffer(); + size = outputStreamToRequestAttribute.size(); + } else { + outputBuffer = new byte[maxAttributeSize]; + size = StreamUtils.fillBuffer(responseBodyStream, outputBuffer, false); + } + String bodyString = new String(outputBuffer, 0, size, getCharsetFromMediaType(responseBody.contentType())); + requestFlowFile = session.putAttribute(requestFlowFile, attributeKey, bodyString); + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - if(requestFlowFile != null) { - session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis); - } else { - session.getProvenanceReporter().receive(responseFlowFile, url.toExternalForm(), millis); - } + session.getProvenanceReporter().modifyAttributes(requestFlowFile, "The " + attributeKey + " has been added. The value of which is the body of a http call to " + + url.toExternalForm() + ". It took " + millis + "millis,"); + } + } finally { + if(outputStreamToRequestAttribute != null){ + outputStreamToRequestAttribute.close(); + outputStreamToRequestAttribute = null; + } + if(teeInputStream != null){ + teeInputStream.close(); + teeInputStream = null; + } else if(responseBodyStream != null){ + responseBodyStream.close(); + responseBodyStream = null; } } - // if not successful and request flowfile is not null, store the response body into a flowfile attribute - if (outputBodyToRequestAttribute && bodyExists) { - String attributeKey = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).evaluateAttributeExpressions(requestFlowFile).getValue(); - if (attributeKey == null) { - attributeKey = RESPONSE_BODY; - } - byte[] outputBuffer; - int size; + route(requestFlowFile, responseFlowFile, session, context, statusCode); - if (outputStreamToRequestAttribute != null) { - outputBuffer = outputStreamToRequestAttribute.getBuffer(); - size = outputStreamToRequestAttribute.size(); - } else { - outputBuffer = new byte[maxAttributeSize]; - size = StreamUtils.fillBuffer(responseBodyStream, outputBuffer, false); - } - String bodyString = new String(outputBuffer, 0, size, getCharsetFromMediaType(responseBody.contentType())); - requestFlowFile = session.putAttribute(requestFlowFile, attributeKey, bodyString); - - final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().modifyAttributes(requestFlowFile, "The " + attributeKey + " has been added. The value of which is the body of a http call to " - + url.toExternalForm() + ". It took " + millis + "millis,"); - } - } finally { - if(outputStreamToRequestAttribute != null){ - outputStreamToRequestAttribute.close(); - outputStreamToRequestAttribute = null; - } - if(teeInputStream != null){ - teeInputStream.close(); - teeInputStream = null; - } else if(responseBodyStream != null){ - responseBodyStream.close(); - responseBodyStream = null; - } } - - route(requestFlowFile, responseFlowFile, session, context, statusCode); } catch (final Exception e) { // penalize or yield if (requestFlowFile != null) {