mirror of
https://github.com/apache/nifi.git
synced 2025-02-06 10:08:42 +00:00
NIFI-10: Added FETCH Provenance Event and updated processors to use this new event type
This commit is contained in:
parent
aec32a277c
commit
51f564024a
@ -23,45 +23,66 @@ public enum ProvenanceEventType {
|
||||
* not received from a remote system or external process
|
||||
*/
|
||||
CREATE,
|
||||
|
||||
/**
|
||||
* Indicates a provenance event for receiving data from an external process
|
||||
* Indicates a provenance event for receiving data from an external process. This Event Type
|
||||
* is expected to be the first event for a FlowFile. As such, a Processor that receives data
|
||||
* from an external source and uses that data to replace the content of an existing FlowFile
|
||||
* should use the {@link #FETCH} event type, rather than the RECEIVE event type.
|
||||
*/
|
||||
RECEIVE,
|
||||
|
||||
/**
|
||||
* Indicates that the contents of a FlowFile were overwritten using the contents of some
|
||||
* external resource. This is similar to the {@link #RECEIVE} event but varies in that
|
||||
* RECEIVE events are intended to be used as the event that introduces the FlowFile into
|
||||
* the system, whereas FETCH is used to indicate that the contents of an existing FlowFile
|
||||
* were overwritten.
|
||||
*/
|
||||
FETCH,
|
||||
|
||||
/**
|
||||
* Indicates a provenance event for sending data to an external process
|
||||
*/
|
||||
SEND,
|
||||
|
||||
/**
|
||||
* Indicates a provenance event for the conclusion of an object's life for
|
||||
* some reason other than object expiration
|
||||
*/
|
||||
DROP,
|
||||
|
||||
/**
|
||||
* Indicates a provenance event for the conclusion of an object's life due
|
||||
* to the fact that the object could not be processed in a timely manner
|
||||
*/
|
||||
EXPIRE,
|
||||
|
||||
/**
|
||||
* FORK is used to indicate that one or more FlowFile was derived from a
|
||||
* parent FlowFile.
|
||||
*/
|
||||
FORK,
|
||||
|
||||
/**
|
||||
* JOIN is used to indicate that a single FlowFile is derived from joining
|
||||
* together multiple parent FlowFiles.
|
||||
*/
|
||||
JOIN,
|
||||
|
||||
/**
|
||||
* CLONE is used to indicate that a FlowFile is an exact duplicate of its
|
||||
* parent FlowFile.
|
||||
*/
|
||||
CLONE,
|
||||
|
||||
/**
|
||||
* CONTENT_MODIFIED is used to indicate that a FlowFile's content was
|
||||
* modified in some way. When using this Event Type, it is advisable to
|
||||
* provide details about how the content is modified.
|
||||
*/
|
||||
CONTENT_MODIFIED,
|
||||
|
||||
/**
|
||||
* ATTRIBUTES_MODIFIED is used to indicate that a FlowFile's attributes were
|
||||
* modified in some way. This event is not needed when another event is
|
||||
@ -69,17 +90,20 @@ public enum ProvenanceEventType {
|
||||
* FlowFile attributes.
|
||||
*/
|
||||
ATTRIBUTES_MODIFIED,
|
||||
|
||||
/**
|
||||
* ROUTE is used to show that a FlowFile was routed to a specified
|
||||
* {@link org.apache.nifi.processor.Relationship Relationship} and should provide
|
||||
* information about why the FlowFile was routed to this relationship.
|
||||
*/
|
||||
ROUTE,
|
||||
|
||||
/**
|
||||
* Indicates a provenance event for adding additional information such as a
|
||||
* new linkage to a new URI or UUID
|
||||
*/
|
||||
ADDINFO,
|
||||
|
||||
/**
|
||||
* Indicates a provenance event for replaying a FlowFile. The UUID of the
|
||||
* event will indicate the UUID of the original FlowFile that is being
|
||||
|
@ -123,6 +123,43 @@ public interface ProvenanceReporter {
|
||||
*/
|
||||
void receive(FlowFile flowFile, String transitUri, String sourceSystemFlowFileIdentifier, String details, long transmissionMillis);
|
||||
|
||||
/**
|
||||
* Emits a Provenance Event of type
|
||||
* {@link ProvenanceEventType#FETCH FETCH} that indicates that the content of the given
|
||||
* FlowFile was overwritten with the data received from an external source.
|
||||
*
|
||||
* @param flowFile the FlowFile whose content was replaced
|
||||
* @param transitUri A URI that provides information about the System and
|
||||
* Protocol information over which the transfer occurred.
|
||||
*/
|
||||
void fetch(FlowFile flowFile, String transitUri);
|
||||
|
||||
/**
|
||||
* Emits a Provenance Event of type
|
||||
* {@link ProvenanceEventType#FETCH FETCH} that indicates that the content of the given
|
||||
* FlowFile was overwritten with the data received from an external source.
|
||||
*
|
||||
* @param flowFile the FlowFile whose content was replaced
|
||||
* @param transitUri A URI that provides information about the System and
|
||||
* Protocol information over which the transfer occurred.
|
||||
* @param transmissionMillis the number of milliseconds taken to transfer the data
|
||||
*/
|
||||
void fetch(FlowFile flowFile, String transitUri, long transmissionMillis);
|
||||
|
||||
/**
|
||||
* Emits a Provenance Event of type
|
||||
* {@link ProvenanceEventType#FETCH FETCH} that indicates that the content of the given
|
||||
* FlowFile was overwritten with the data received from an external source.
|
||||
*
|
||||
* @param flowFile the FlowFile whose content was replaced
|
||||
* @param transitUri A URI that provides information about the System and
|
||||
* Protocol information over which the transfer occurred.
|
||||
* @param details details about the event
|
||||
* @param transmissionMillis the number of milliseconds taken to transfer
|
||||
* the data
|
||||
*/
|
||||
void fetch(FlowFile flowFile, String transitUri, String details, long transmissionMillis);
|
||||
|
||||
/**
|
||||
* Emits a Provenance Event of type {@link ProvenanceEventType#SEND SEND}
|
||||
* that indicates that a copy of the given FlowFile was sent to an external
|
||||
|
@ -124,7 +124,40 @@ public class MockProvenanceReporter implements ProvenanceReporter {
|
||||
|
||||
try {
|
||||
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE)
|
||||
.setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
|
||||
.setTransitUri(transitUri)
|
||||
.setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier)
|
||||
.setEventDuration(transmissionMillis)
|
||||
.setDetails(details)
|
||||
.build();
|
||||
events.add(record);
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to generate Provenance Event due to " + e);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.error("", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fetch(final FlowFile flowFile, final String transitUri) {
|
||||
fetch(flowFile, transitUri, -1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fetch(final FlowFile flowFile, final String transitUri, final long transmissionMillis) {
|
||||
fetch(flowFile, transitUri, null, transmissionMillis);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fetch(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) {
|
||||
verifyFlowFileKnown(flowFile);
|
||||
|
||||
try {
|
||||
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.FETCH)
|
||||
.setTransitUri(transitUri)
|
||||
.setEventDuration(transmissionMillis)
|
||||
.setDetails(details)
|
||||
.build();
|
||||
events.add(record);
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to generate Provenance Event due to " + e);
|
||||
|
@ -152,7 +152,7 @@ public class FetchS3Object extends AbstractS3Processor {
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
|
||||
session.getProvenanceReporter().receive(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis);
|
||||
session.getProvenanceReporter().fetch(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -124,7 +124,36 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
|
||||
|
||||
try {
|
||||
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE)
|
||||
.setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
|
||||
.setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
|
||||
events.add(record);
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to generate Provenance Event due to " + e);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.error("", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fetch(final FlowFile flowFile, final String transitUri) {
|
||||
fetch(flowFile, transitUri, -1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fetch(final FlowFile flowFile, final String transitUri, final long transmissionMillis) {
|
||||
fetch(flowFile, transitUri, null, transmissionMillis);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fetch(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) {
|
||||
verifyFlowFileKnown(flowFile);
|
||||
|
||||
try {
|
||||
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.FETCH)
|
||||
.setTransitUri(transitUri)
|
||||
.setEventDuration(transmissionMillis)
|
||||
.setDetails(details)
|
||||
.build();
|
||||
events.add(record);
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to generate Provenance Event due to " + e);
|
||||
|
@ -89,7 +89,7 @@ import org.joda.time.format.DateTimeFormatter;
|
||||
@WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"),
|
||||
@WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server")})
|
||||
@DynamicProperty(name = "Trusted Hostname", value = "A hostname", description = "Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
|
||||
+ "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
|
||||
+ "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
|
||||
public final class InvokeHTTP extends AbstractProcessor {
|
||||
|
||||
@Override
|
||||
@ -170,76 +170,75 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||
// This set includes our strings defined above as well as some standard flowfile
|
||||
// attributes.
|
||||
public static final Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN,
|
||||
"uuid", "filename", "path"
|
||||
)));
|
||||
STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN,
|
||||
"uuid", "filename", "path")));
|
||||
|
||||
// properties
|
||||
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
|
||||
.name("HTTP Method")
|
||||
.description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).")
|
||||
.required(true)
|
||||
.defaultValue("GET")
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
.name("HTTP Method")
|
||||
.description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).")
|
||||
.required(true)
|
||||
.defaultValue("GET")
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
|
||||
.name("Remote URL")
|
||||
.description("Remote URL which will be connected to, including scheme, host, port, path.")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.URL_VALIDATOR)
|
||||
.build();
|
||||
.name("Remote URL")
|
||||
.description("Remote URL which will be connected to, including scheme, host, port, path.")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.URL_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("Connection Timeout")
|
||||
.description("Max wait time for connection to remote service.")
|
||||
.required(true)
|
||||
.defaultValue("5 secs")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.build();
|
||||
.name("Connection Timeout")
|
||||
.description("Max wait time for connection to remote service.")
|
||||
.required(true)
|
||||
.defaultValue("5 secs")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("Read Timeout")
|
||||
.description("Max wait time for response from remote service.")
|
||||
.required(true)
|
||||
.defaultValue("15 secs")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.build();
|
||||
.name("Read Timeout")
|
||||
.description("Max wait time for response from remote service.")
|
||||
.required(true)
|
||||
.defaultValue("15 secs")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder()
|
||||
.name("Include Date Header")
|
||||
.description("Include an RFC-2616 Date header in the request.")
|
||||
.required(true)
|
||||
.defaultValue("True")
|
||||
.allowableValues("True", "False")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.build();
|
||||
.name("Include Date Header")
|
||||
.description("Include an RFC-2616 Date header in the request.")
|
||||
.required(true)
|
||||
.defaultValue("True")
|
||||
.allowableValues("True", "False")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder()
|
||||
.name("Follow Redirects")
|
||||
.description("Follow HTTP redirects issued by remote server.")
|
||||
.required(true)
|
||||
.defaultValue("True")
|
||||
.allowableValues("True", "False")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.build();
|
||||
.name("Follow Redirects")
|
||||
.description("Follow HTTP redirects issued by remote server.")
|
||||
.required(true)
|
||||
.defaultValue("True")
|
||||
.allowableValues("True", "False")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder()
|
||||
.name("Attributes to Send")
|
||||
.description("Regular expression that defines which attributes to send as HTTP headers in the request. "
|
||||
+ "If not defined, no attributes are sent as headers.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
||||
.build();
|
||||
.name("Attributes to Send")
|
||||
.description("Regular expression that defines which attributes to send as HTTP headers in the request. "
|
||||
+ "If not defined, no attributes are sent as headers.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("SSL Context Service")
|
||||
.description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
|
||||
.required(false)
|
||||
.identifiesControllerService(SSLContextService.class)
|
||||
.build();
|
||||
.name("SSL Context Service")
|
||||
.description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
|
||||
.required(false)
|
||||
.identifiesControllerService(SSLContextService.class)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder()
|
||||
.name("Proxy Host")
|
||||
@ -256,33 +255,33 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||
.build();
|
||||
|
||||
// Per RFC 7235, 2617, and 2616.
|
||||
// basic-credentials = base64-user-pass
|
||||
// base64-user-pass = userid ":" password
|
||||
// userid = *<TEXT excluding ":">
|
||||
// password = *TEXT
|
||||
// basic-credentials = base64-user-pass
|
||||
// base64-user-pass = userid ":" password
|
||||
// userid = *<TEXT excluding ":">
|
||||
// password = *TEXT
|
||||
//
|
||||
// OCTET = <any 8-bit sequence of data>
|
||||
// CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
|
||||
// LWS = [CRLF] 1*( SP | HT )
|
||||
// TEXT = <any OCTET except CTLs but including LWS>
|
||||
// OCTET = <any 8-bit sequence of data>
|
||||
// CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
|
||||
// LWS = [CRLF] 1*( SP | HT )
|
||||
// TEXT = <any OCTET except CTLs but including LWS>
|
||||
//
|
||||
// Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs.
|
||||
public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder()
|
||||
.name("Basic Authentication Username")
|
||||
.displayName("Basic Authentication Username")
|
||||
.description("The username to be used by the client to authenticate against the Remote URL. Cannot include control characters (0-31), ':', or DEL (127).")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$")))
|
||||
.build();
|
||||
.name("Basic Authentication Username")
|
||||
.displayName("Basic Authentication Username")
|
||||
.description("The username to be used by the client to authenticate against the Remote URL. Cannot include control characters (0-31), ':', or DEL (127).")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$")))
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("Basic Authentication Password")
|
||||
.displayName("Basic Authentication Password")
|
||||
.description("The password to be used by the client to authenticate against the Remote URL.")
|
||||
.required(false)
|
||||
.sensitive(true)
|
||||
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
|
||||
.build();
|
||||
.name("Basic Authentication Password")
|
||||
.displayName("Basic Authentication Password")
|
||||
.description("The password to be used by the client to authenticate against the Remote URL.")
|
||||
.required(false)
|
||||
.sensitive(true)
|
||||
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
|
||||
PROP_METHOD,
|
||||
@ -296,48 +295,46 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||
PROP_BASIC_AUTH_USERNAME,
|
||||
PROP_BASIC_AUTH_PASSWORD,
|
||||
PROP_PROXY_HOST,
|
||||
PROP_PROXY_PORT
|
||||
));
|
||||
PROP_PROXY_PORT));
|
||||
|
||||
// property to allow the hostname verifier to be overridden
|
||||
// this is a "hidden" property - it's configured using a dynamic user property
|
||||
public static final PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder()
|
||||
.name("Trusted Hostname")
|
||||
.description("Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
|
||||
+ "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.dynamic(true)
|
||||
.build();
|
||||
.name("Trusted Hostname")
|
||||
.description("Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
|
||||
+ "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.dynamic(true)
|
||||
.build();
|
||||
|
||||
// relationships
|
||||
public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
|
||||
.name("Original")
|
||||
.description("Original FlowFile will be routed upon success (2xx status codes).")
|
||||
.build();
|
||||
.name("Original")
|
||||
.description("Original FlowFile will be routed upon success (2xx status codes).")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS_RESP = new Relationship.Builder()
|
||||
.name("Response")
|
||||
.description("Response FlowFile will be routed upon success (2xx status codes).")
|
||||
.build();
|
||||
.name("Response")
|
||||
.description("Response FlowFile will be routed upon success (2xx status codes).")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_RETRY = new Relationship.Builder()
|
||||
.name("Retry")
|
||||
.description("FlowFile will be routed on any status code that can be retried (5xx status codes).")
|
||||
.build();
|
||||
.name("Retry")
|
||||
.description("FlowFile will be routed on any status code that can be retried (5xx status codes).")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_NO_RETRY = new Relationship.Builder()
|
||||
.name("No Retry")
|
||||
.description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).")
|
||||
.build();
|
||||
.name("No Retry")
|
||||
.description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.name("Failure")
|
||||
.description("FlowFile will be routed on any type of connection failure, timeout or general exception.")
|
||||
.build();
|
||||
.name("Failure")
|
||||
.description("FlowFile will be routed on any type of connection failure, timeout or general exception.")
|
||||
.build();
|
||||
|
||||
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE
|
||||
)));
|
||||
REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE)));
|
||||
|
||||
}
|
||||
|
||||
@ -403,7 +400,7 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||
transfer();
|
||||
} catch (final Exception e) {
|
||||
// log exception
|
||||
logger.error("Routing to {} due to exception: {}", new Object[] { REL_FAILURE.getName(), e }, e);
|
||||
logger.error("Routing to {} due to exception: {}", new Object[] {REL_FAILURE.getName(), e}, e);
|
||||
|
||||
// penalize
|
||||
request = session.penalize(request);
|
||||
@ -417,7 +414,7 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||
session.remove(response);
|
||||
}
|
||||
} catch (final Exception e1) {
|
||||
logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] { e1 }, e1);
|
||||
logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] {e1}, e1);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -545,7 +542,7 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||
|
||||
// emit provenance event
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
session.getProvenanceReporter().modifyContent(response, "Updated content with data received from " + conn.getURL().toExternalForm(), millis);
|
||||
session.getProvenanceReporter().fetch(response, conn.getURL().toExternalForm(), millis);
|
||||
}
|
||||
|
||||
}
|
||||
@ -562,7 +559,7 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||
|
||||
// log the status codes from the response
|
||||
logger.info("Request to {} returned status code {} for {}",
|
||||
new Object[]{conn.getURL().toExternalForm(), statusCode, request});
|
||||
new Object[] {conn.getURL().toExternalForm(), statusCode, request});
|
||||
|
||||
// transfer to the correct relationship
|
||||
// 2xx -> SUCCESS
|
||||
@ -660,12 +657,12 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||
|
||||
private void logRequest() {
|
||||
logger.debug("\nRequest to remote service:\n\t{}\n{}",
|
||||
new Object[]{conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())});
|
||||
new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())});
|
||||
}
|
||||
|
||||
private void logResponse() {
|
||||
logger.debug("\nResponse from remote service:\n\t{}\n{}",
|
||||
new Object[]{conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())});
|
||||
new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())});
|
||||
}
|
||||
|
||||
private String getLogString(Map<String, List<String>> map) {
|
||||
@ -753,7 +750,7 @@ public final class InvokeHTTP extends AbstractProcessor {
|
||||
return new BufferedInputStream(is);
|
||||
|
||||
} catch (IOException e) {
|
||||
logger.warn("Response stream threw an exception: {}", new Object[]{e}, e);
|
||||
logger.warn("Response stream threw an exception: {}", new Object[] {e}, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -149,8 +149,8 @@ public class TestInvokeHTTP {
|
||||
runner.assertTransferCount(Config.REL_NO_RETRY, 0);
|
||||
runner.assertTransferCount(Config.REL_FAILURE, 0);
|
||||
|
||||
//expected in request status.code and status.message
|
||||
//original flow file (+attributes)??????????
|
||||
// expected in request status.code and status.message
|
||||
// original flow file (+attributes)??????????
|
||||
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0);
|
||||
bundle.assertAttributeEquals(Config.STATUS_CODE, "200");
|
||||
bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
|
||||
@ -159,10 +159,10 @@ public class TestInvokeHTTP {
|
||||
Assert.assertEquals(expected, actual);
|
||||
bundle.assertAttributeEquals("Foo", "Bar");
|
||||
|
||||
//expected in response
|
||||
//status code, status message, all headers from server response --> ff attributes
|
||||
//server response message body into payload of ff
|
||||
//should not contain any original ff attributes
|
||||
// expected in response
|
||||
// status code, status message, all headers from server response --> ff attributes
|
||||
// server response message body into payload of ff
|
||||
// should not contain any original ff attributes
|
||||
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0);
|
||||
bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
|
||||
bundle1.assertAttributeEquals(Config.STATUS_CODE, "200");
|
||||
@ -198,8 +198,8 @@ public class TestInvokeHTTP {
|
||||
runner.assertTransferCount(Config.REL_NO_RETRY, 0);
|
||||
runner.assertTransferCount(Config.REL_FAILURE, 0);
|
||||
|
||||
//expected in request status.code and status.message
|
||||
//original flow file (+attributes)??????????
|
||||
// expected in request status.code and status.message
|
||||
// original flow file (+attributes)??????????
|
||||
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0);
|
||||
bundle.assertAttributeEquals(Config.STATUS_CODE, "200");
|
||||
bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
|
||||
@ -208,10 +208,10 @@ public class TestInvokeHTTP {
|
||||
final String expected = "Hello";
|
||||
Assert.assertEquals(expected, actual);
|
||||
|
||||
//expected in response
|
||||
//status code, status message, all headers from server response --> ff attributes
|
||||
//server response message body into payload of ff
|
||||
//should not contain any original ff attributes
|
||||
// expected in response
|
||||
// status code, status message, all headers from server response --> ff attributes
|
||||
// server response message body into payload of ff
|
||||
// should not contain any original ff attributes
|
||||
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0);
|
||||
final String bundle1Content = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
|
||||
assertTrue(bundle1Content.startsWith(expAuth)); // use startsWith instead of equals so we can ignore line endings
|
||||
@ -223,17 +223,17 @@ public class TestInvokeHTTP {
|
||||
final List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
|
||||
assertEquals(2, provEvents.size());
|
||||
boolean forkEvent = false;
|
||||
boolean contentModEvent = false;
|
||||
boolean fetchEvent = false;
|
||||
for (final ProvenanceEventRecord event : provEvents) {
|
||||
if (event.getEventType() == ProvenanceEventType.FORK) {
|
||||
forkEvent = true;
|
||||
} else if (event.getEventType() == ProvenanceEventType.CONTENT_MODIFIED) {
|
||||
contentModEvent = true;
|
||||
} else if (event.getEventType() == ProvenanceEventType.FETCH) {
|
||||
fetchEvent = true;
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(forkEvent);
|
||||
assertTrue(contentModEvent);
|
||||
assertTrue(fetchEvent);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -257,8 +257,8 @@ public class TestInvokeHTTP {
|
||||
runner.assertTransferCount(Config.REL_NO_RETRY, 1);
|
||||
runner.assertTransferCount(Config.REL_FAILURE, 0);
|
||||
|
||||
//expected in request status.code and status.message
|
||||
//original flow file (+attributes)??????????
|
||||
// expected in request status.code and status.message
|
||||
// original flow file (+attributes)??????????
|
||||
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
|
||||
bundle.assertAttributeEquals(Config.STATUS_CODE, "401");
|
||||
bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Unauthorized");
|
||||
@ -286,7 +286,7 @@ public class TestInvokeHTTP {
|
||||
runner.assertTransferCount(Config.REL_NO_RETRY, 0);
|
||||
runner.assertTransferCount(Config.REL_FAILURE, 0);
|
||||
|
||||
//expected in response
|
||||
// expected in response
|
||||
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_RETRY).get(0);
|
||||
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
|
||||
bundle.assertAttributeEquals(Config.STATUS_CODE, "500");
|
||||
@ -313,8 +313,8 @@ public class TestInvokeHTTP {
|
||||
runner.assertTransferCount(Config.REL_RETRY, 0);
|
||||
runner.assertTransferCount(Config.REL_NO_RETRY, 1);
|
||||
runner.assertTransferCount(Config.REL_FAILURE, 0);
|
||||
//getMyFlowFiles();
|
||||
//expected in response
|
||||
// getMyFlowFiles();
|
||||
// expected in response
|
||||
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
|
||||
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
|
||||
|
||||
@ -340,8 +340,8 @@ public class TestInvokeHTTP {
|
||||
runner.assertTransferCount(Config.REL_RETRY, 0);
|
||||
runner.assertTransferCount(Config.REL_NO_RETRY, 1);
|
||||
runner.assertTransferCount(Config.REL_FAILURE, 0);
|
||||
//getMyFlowFiles();
|
||||
//expected in response
|
||||
// getMyFlowFiles();
|
||||
// expected in response
|
||||
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
|
||||
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
|
||||
|
||||
@ -367,8 +367,8 @@ public class TestInvokeHTTP {
|
||||
runner.assertTransferCount(Config.REL_RETRY, 0);
|
||||
runner.assertTransferCount(Config.REL_NO_RETRY, 1);
|
||||
runner.assertTransferCount(Config.REL_FAILURE, 0);
|
||||
//getMyFlowFiles();
|
||||
//expected in response
|
||||
// getMyFlowFiles();
|
||||
// expected in response
|
||||
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
|
||||
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
|
||||
|
||||
@ -397,7 +397,7 @@ public class TestInvokeHTTP {
|
||||
runner.assertTransferCount(Config.REL_NO_RETRY, 1);
|
||||
runner.assertTransferCount(Config.REL_FAILURE, 0);
|
||||
|
||||
//expected in response
|
||||
// expected in response
|
||||
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
|
||||
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
|
||||
|
||||
@ -593,7 +593,7 @@ public class TestInvokeHTTP {
|
||||
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest,
|
||||
HttpServletRequest request, HttpServletResponse response)
|
||||
HttpServletRequest request, HttpServletResponse response)
|
||||
throws IOException, ServletException {
|
||||
|
||||
baseRequest.setHandled(true);
|
||||
|
Loading…
x
Reference in New Issue
Block a user