NIFI-5085 In InvokeHttp, moving the OkHttp Response object to a try with resources

This commit is contained in:
Joe Percivall 2018-04-16 22:36:26 -04:00 committed by Matthew Burgess
parent 6d804ee1fe
commit a0c9bebe24
1 changed files with 110 additions and 108 deletions

View File

@ -779,133 +779,135 @@ public final class InvokeHTTP extends AbstractProcessor {
}
final long startNanos = System.nanoTime();
Response responseHttp = okHttpClient.newCall(httpRequest).execute();
// output the raw response headers (DEBUG level only)
logResponse(logger, url, responseHttp);
try (Response responseHttp = okHttpClient.newCall(httpRequest).execute()) {
// output the raw response headers (DEBUG level only)
logResponse(logger, url, responseHttp);
// store the status code and message
int statusCode = responseHttp.code();
String statusMessage = responseHttp.message();
// store the status code and message
int statusCode = responseHttp.code();
String statusMessage = responseHttp.message();
if (statusCode == 0) {
throw new IllegalStateException("Status code unknown, connection hasn't been attempted.");
}
// Create a map of the status attributes that are always written to the request and response FlowFiles
Map<String, String> statusAttributes = new HashMap<>();
statusAttributes.put(STATUS_CODE, String.valueOf(statusCode));
statusAttributes.put(STATUS_MESSAGE, statusMessage);
statusAttributes.put(REQUEST_URL, url.toExternalForm());
statusAttributes.put(TRANSACTION_ID, txId.toString());
if (requestFlowFile != null) {
requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes);
}
// If the property to add the response headers to the request flowfile is true then add them
if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean() && requestFlowFile != null) {
// write the response headers as attributes
// this will overwrite any existing flowfile attributes
requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(url, responseHttp));
}
boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null;
boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean();
ResponseBody responseBody = responseHttp.body();
boolean bodyExists = responseBody != null;
InputStream responseBodyStream = null;
SoftLimitBoundedByteArrayOutputStream outputStreamToRequestAttribute = null;
TeeInputStream teeInputStream = null;
try {
responseBodyStream = bodyExists ? responseBody.byteStream() : null;
if (responseBodyStream != null && outputBodyToRequestAttribute && outputBodyToResponseContent) {
outputStreamToRequestAttribute = new SoftLimitBoundedByteArrayOutputStream(maxAttributeSize);
teeInputStream = new TeeInputStream(responseBodyStream, outputStreamToRequestAttribute);
if (statusCode == 0) {
throw new IllegalStateException("Status code unknown, connection hasn't been attempted.");
}
if (outputBodyToResponseContent) {
/*
* If successful and putting to response flowfile, store the response body as the flowfile payload
* we include additional flowfile attributes including the response headers and the status codes.
*/
// Create a map of the status attributes that are always written to the request and response FlowFiles
Map<String, String> statusAttributes = new HashMap<>();
statusAttributes.put(STATUS_CODE, String.valueOf(statusCode));
statusAttributes.put(STATUS_MESSAGE, statusMessage);
statusAttributes.put(REQUEST_URL, url.toExternalForm());
statusAttributes.put(TRANSACTION_ID, txId.toString());
// clone the flowfile to capture the response
if (requestFlowFile != null) {
responseFlowFile = session.create(requestFlowFile);
} else {
responseFlowFile = session.create();
}
// write attributes to response flowfile
responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes);
if (requestFlowFile != null) {
requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes);
}
// If the property to add the response headers to the request flowfile is true then add them
if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean() && requestFlowFile != null) {
// write the response headers as attributes
// this will overwrite any existing flowfile attributes
responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(url, responseHttp));
requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(url, responseHttp));
}
// transfer the message body to the payload
// can potentially be null in edge cases
if (bodyExists) {
// write content type attribute to response flowfile if it is available
if (responseBody.contentType() != null) {
responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), responseBody.contentType().toString());
}
if (teeInputStream != null) {
responseFlowFile = session.importFrom(teeInputStream, responseFlowFile);
boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null;
boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean();
ResponseBody responseBody = responseHttp.body();
boolean bodyExists = responseBody != null;
InputStream responseBodyStream = null;
SoftLimitBoundedByteArrayOutputStream outputStreamToRequestAttribute = null;
TeeInputStream teeInputStream = null;
try {
responseBodyStream = bodyExists ? responseBody.byteStream() : null;
if (responseBodyStream != null && outputBodyToRequestAttribute && outputBodyToResponseContent) {
outputStreamToRequestAttribute = new SoftLimitBoundedByteArrayOutputStream(maxAttributeSize);
teeInputStream = new TeeInputStream(responseBodyStream, outputStreamToRequestAttribute);
}
if (outputBodyToResponseContent) {
/*
* If successful and putting to response flowfile, store the response body as the flowfile payload
* we include additional flowfile attributes including the response headers and the status codes.
*/
// clone the flowfile to capture the response
if (requestFlowFile != null) {
responseFlowFile = session.create(requestFlowFile);
} else {
responseFlowFile = session.importFrom(responseBodyStream, responseFlowFile);
responseFlowFile = session.create();
}
// emit provenance event
// write attributes to response flowfile
responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes);
// write the response headers as attributes
// this will overwrite any existing flowfile attributes
responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(url, responseHttp));
// transfer the message body to the payload
// can potentially be null in edge cases
if (bodyExists) {
// write content type attribute to response flowfile if it is available
if (responseBody.contentType() != null) {
responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), responseBody.contentType().toString());
}
if (teeInputStream != null) {
responseFlowFile = session.importFrom(teeInputStream, responseFlowFile);
} else {
responseFlowFile = session.importFrom(responseBodyStream, responseFlowFile);
}
// emit provenance event
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
if(requestFlowFile != null) {
session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis);
} else {
session.getProvenanceReporter().receive(responseFlowFile, url.toExternalForm(), millis);
}
}
}
// if not successful and request flowfile is not null, store the response body into a flowfile attribute
if (outputBodyToRequestAttribute && bodyExists) {
String attributeKey = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).evaluateAttributeExpressions(requestFlowFile).getValue();
if (attributeKey == null) {
attributeKey = RESPONSE_BODY;
}
byte[] outputBuffer;
int size;
if (outputStreamToRequestAttribute != null) {
outputBuffer = outputStreamToRequestAttribute.getBuffer();
size = outputStreamToRequestAttribute.size();
} else {
outputBuffer = new byte[maxAttributeSize];
size = StreamUtils.fillBuffer(responseBodyStream, outputBuffer, false);
}
String bodyString = new String(outputBuffer, 0, size, getCharsetFromMediaType(responseBody.contentType()));
requestFlowFile = session.putAttribute(requestFlowFile, attributeKey, bodyString);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
if(requestFlowFile != null) {
session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis);
} else {
session.getProvenanceReporter().receive(responseFlowFile, url.toExternalForm(), millis);
}
session.getProvenanceReporter().modifyAttributes(requestFlowFile, "The " + attributeKey + " has been added. The value of which is the body of a http call to "
+ url.toExternalForm() + ". It took " + millis + "millis,");
}
} finally {
if(outputStreamToRequestAttribute != null){
outputStreamToRequestAttribute.close();
outputStreamToRequestAttribute = null;
}
if(teeInputStream != null){
teeInputStream.close();
teeInputStream = null;
} else if(responseBodyStream != null){
responseBodyStream.close();
responseBodyStream = null;
}
}
// if not successful and request flowfile is not null, store the response body into a flowfile attribute
if (outputBodyToRequestAttribute && bodyExists) {
String attributeKey = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).evaluateAttributeExpressions(requestFlowFile).getValue();
if (attributeKey == null) {
attributeKey = RESPONSE_BODY;
}
byte[] outputBuffer;
int size;
route(requestFlowFile, responseFlowFile, session, context, statusCode);
if (outputStreamToRequestAttribute != null) {
outputBuffer = outputStreamToRequestAttribute.getBuffer();
size = outputStreamToRequestAttribute.size();
} else {
outputBuffer = new byte[maxAttributeSize];
size = StreamUtils.fillBuffer(responseBodyStream, outputBuffer, false);
}
String bodyString = new String(outputBuffer, 0, size, getCharsetFromMediaType(responseBody.contentType()));
requestFlowFile = session.putAttribute(requestFlowFile, attributeKey, bodyString);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().modifyAttributes(requestFlowFile, "The " + attributeKey + " has been added. The value of which is the body of a http call to "
+ url.toExternalForm() + ". It took " + millis + "millis,");
}
} finally {
if(outputStreamToRequestAttribute != null){
outputStreamToRequestAttribute.close();
outputStreamToRequestAttribute = null;
}
if(teeInputStream != null){
teeInputStream.close();
teeInputStream = null;
} else if(responseBodyStream != null){
responseBodyStream.close();
responseBodyStream = null;
}
}
route(requestFlowFile, responseFlowFile, session, context, statusCode);
} catch (final Exception e) {
// penalize or yield
if (requestFlowFile != null) {