From 9064b976317e316f42ac279dd026105b54a17ddb Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Thu, 28 Apr 2016 18:59:42 +0200 Subject: [PATCH] NIFI-1816 Added provenance event SEND in HandleHttpResponse This closes #389 --- .../standard/HandleHttpRequest.java | 26 ++++++------ .../standard/HandleHttpResponse.java | 24 ++++++++--- .../processors/standard/util/HTTPUtils.java | 42 +++++++++++++++++++ .../standard/TestHandleHttpResponse.java | 14 ++++++- 4 files changed, 86 insertions(+), 20 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/HTTPUtils.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java index b0e91b4824..f3b065a023 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java @@ -62,6 +62,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.HTTPUtils; import org.apache.nifi.ssl.SSLContextService; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; @@ -80,20 +81,22 @@ import com.sun.jersey.api.client.ClientResponse.Status; @CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. For each request, creates a FlowFile and transfers to 'success'. " + "This Processor is designed to be used in conjunction with the HandleHttpResponse Processor in order to create a Web Service") @WritesAttributes({ - @WritesAttribute(attribute = "http.context.identifier", description = "An identifier that allows the HandleHttpRequest and HandleHttpResponse " + @WritesAttribute(attribute = HTTPUtils.HTTP_CONTEXT_ID, description = "An identifier that allows the HandleHttpRequest and HandleHttpResponse " + "to coordinate which FlowFile belongs to which HTTP Request/Response."), @WritesAttribute(attribute = "mime.type", description = "The MIME Type of the data, according to the HTTP Header \"Content-Type\""), @WritesAttribute(attribute = "http.servlet.path", description = "The part of the request URL that is considered the Servlet Path"), @WritesAttribute(attribute = "http.context.path", description = "The part of the request URL that is considered to be the Context Path"), @WritesAttribute(attribute = "http.method", description = "The HTTP Method that was used for the request, such as GET or POST"), + @WritesAttribute(attribute = HTTPUtils.HTTP_LOCAL_NAME, description = "IP address/hostname of the server"), + @WritesAttribute(attribute = HTTPUtils.HTTP_PORT, description = "Listening port of the server"), @WritesAttribute(attribute = "http.query.string", description = "The query string portion of hte Request URL"), - @WritesAttribute(attribute = "http.remote.host", description = "The hostname of the requestor"), + @WritesAttribute(attribute = HTTPUtils.HTTP_REMOTE_HOST, description = "The hostname of the requestor"), @WritesAttribute(attribute = "http.remote.addr", description = "The hostname:port combination of the requestor"), @WritesAttribute(attribute = "http.remote.user", description = "The username of the requestor"), - @WritesAttribute(attribute = "http.request.uri", description = "The full Request URL"), + @WritesAttribute(attribute = HTTPUtils.HTTP_REQUEST_URI, description = "The full Request URL"), @WritesAttribute(attribute = "http.auth.type", description = "The type of HTTP Authorization used"), @WritesAttribute(attribute = "http.principal.name", description = "The name of the authenticated user making the request"), - @WritesAttribute(attribute = "http.subject.dn", description = "The Distinguished Name of the requestor. This value will not be populated " + @WritesAttribute(attribute = HTTPUtils.HTTP_SSL_CERT, description = "The Distinguished Name of the requestor. This value will not be populated " + "unless the Processor is configured to use an SSLContext Service"), @WritesAttribute(attribute = "http.issuer.dn", description = "The Distinguished Name of the entity that issued the Subject's certificate. " + "This value will not be populated unless the Processor is configured to use an SSLContext Service"), @@ -104,7 +107,6 @@ import com.sun.jersey.api.client.ClientResponse.Status; classNames = {"org.apache.nifi.http.StandardHttpContextMap", "org.apache.nifi.ssl.StandardSSLContextService"}) public class HandleHttpRequest extends AbstractProcessor { - public static final String HTTP_CONTEXT_ID = "http.context.identifier"; private static final Pattern URL_QUERY_PARAM_DELIMITER = Pattern.compile("&"); // Allowable values for client auth @@ -493,20 +495,20 @@ public class HandleHttpRequest extends AbstractProcessor { final String contextIdentifier = UUID.randomUUID().toString(); final Map attributes = new HashMap<>(); try { - putAttribute(attributes, HTTP_CONTEXT_ID, contextIdentifier); + putAttribute(attributes, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier); putAttribute(attributes, "mime.type", request.getContentType()); putAttribute(attributes, "http.servlet.path", request.getServletPath()); putAttribute(attributes, "http.context.path", request.getContextPath()); putAttribute(attributes, "http.method", request.getMethod()); putAttribute(attributes, "http.local.addr", request.getLocalAddr()); - putAttribute(attributes, "http.local.name", request.getLocalName()); + putAttribute(attributes, HTTPUtils.HTTP_LOCAL_NAME, request.getLocalName()); if (request.getQueryString() != null) { putAttribute(attributes, "http.query.string", URLDecoder.decode(request.getQueryString(), charset)); } - putAttribute(attributes, "http.remote.host", request.getRemoteHost()); + putAttribute(attributes, HTTPUtils.HTTP_REMOTE_HOST, request.getRemoteHost()); putAttribute(attributes, "http.remote.addr", request.getRemoteAddr()); putAttribute(attributes, "http.remote.user", request.getRemoteUser()); - putAttribute(attributes, "http.request.uri", request.getRequestURI()); + putAttribute(attributes, HTTPUtils.HTTP_REQUEST_URI, request.getRequestURI()); putAttribute(attributes, "http.request.url", request.getRequestURL().toString()); putAttribute(attributes, "http.auth.type", request.getAuthType()); @@ -517,7 +519,7 @@ public class HandleHttpRequest extends AbstractProcessor { putAttribute(attributes, "http.character.encoding", request.getCharacterEncoding()); putAttribute(attributes, "http.locale", request.getLocale()); putAttribute(attributes, "http.server.name", request.getServerName()); - putAttribute(attributes, "http.server.port", request.getServerPort()); + putAttribute(attributes, HTTPUtils.HTTP_PORT, request.getServerPort()); final Enumeration paramEnumeration = request.getParameterNames(); while (paramEnumeration.hasMoreElements()) { @@ -585,7 +587,7 @@ public class HandleHttpRequest extends AbstractProcessor { subjectDn = cert.getSubjectDN().getName(); final String issuerDn = cert.getIssuerDN().getName(); - putAttribute(attributes, "http.subject.dn", subjectDn); + putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn); putAttribute(attributes, "http.issuer.dn", issuerDn); } else { subjectDn = null; @@ -613,7 +615,7 @@ public class HandleHttpRequest extends AbstractProcessor { } final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - session.getProvenanceReporter().receive(flowFile, request.getRequestURI(), "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis); + session.getProvenanceReporter().receive(flowFile, HTTPUtils.getURI(attributes), "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis); session.transfer(flowFile, REL_SUCCESS); getLogger().info("Transferring {} to 'success'; received from {}", new Object[]{flowFile, request.getRemoteAddr()}); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java index 1a24e6e027..e15abcc3b8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import javax.servlet.http.HttpServletResponse; @@ -29,6 +30,7 @@ import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -41,19 +43,26 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.HTTPUtils; +import org.apache.nifi.util.StopWatch; @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"http", "https", "response", "egress", "web service"}) @CapabilityDescription("Sends an HTTP Response to the Requestor that generated a FlowFile. This Processor is designed to be used in conjunction with " + "the HandleHttpRequest in order to create a web service.") @DynamicProperty(name = "An HTTP header name", value = "An HTTP header value", description = "These HTTPHeaders are set in the HTTP Response") -@ReadsAttribute(attribute = "http.context.identifier", description = "The value of this attribute is used to lookup the HTTP Response so that the " - + "proper message can be sent back to the requestor. If this attribute is missing, the FlowFile will be routed to 'failure.'") +@ReadsAttributes({ + @ReadsAttribute(attribute = HTTPUtils.HTTP_CONTEXT_ID, description = "The value of this attribute is used to lookup the HTTP Response so that the " + + "proper message can be sent back to the requestor. If this attribute is missing, the FlowFile will be routed to 'failure.'"), + @ReadsAttribute(attribute = HTTPUtils.HTTP_REQUEST_URI, description = "Value of the URI requested by the client. Used for provenance event."), + @ReadsAttribute(attribute = HTTPUtils.HTTP_REMOTE_HOST, description = "IP address of the client. Used for provenance event."), + @ReadsAttribute(attribute = HTTPUtils.HTTP_LOCAL_NAME, description = "IP address/hostname of the server. Used for provenance event."), + @ReadsAttribute(attribute = HTTPUtils.HTTP_PORT, description = "Listening port of the server. Used for provenance event."), + @ReadsAttribute(attribute = HTTPUtils.HTTP_SSL_CERT, description = "SSL distinguished name (if any). Used for provenance event.")}) @SeeAlso(value = {HandleHttpRequest.class}, classNames = {"org.apache.nifi.http.StandardHttpContextMap", "org.apache.nifi.ssl.StandardSSLContextService"}) public class HandleHttpResponse extends AbstractProcessor { public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+"); - public static final String HTTP_CONTEXT_ID = "http.context.identifier"; public static final PropertyDescriptor STATUS_CODE = new PropertyDescriptor.Builder() .name("HTTP Status Code") @@ -113,10 +122,12 @@ public class HandleHttpResponse extends AbstractProcessor { return; } - final String contextIdentifier = flowFile.getAttribute(HTTP_CONTEXT_ID); + final StopWatch stopWatch = new StopWatch(true); + + final String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID); if (contextIdentifier == null) { session.transfer(flowFile, REL_FAILURE); - getLogger().warn("Failed to respond to HTTP request for {} because FlowFile did not have an 'http.context.identifier' attribute", + getLogger().warn("Failed to respond to HTTP request for {} because FlowFile did not have an '" + HTTPUtils.HTTP_CONTEXT_ID + "' attribute", new Object[]{flowFile}); return; } @@ -132,7 +143,7 @@ public class HandleHttpResponse extends AbstractProcessor { if (response == null) { session.transfer(flowFile, REL_FAILURE); getLogger().error("Failed to respond to HTTP request for {} because FlowFile had an '{}' attribute of {} but could not find an HTTP Response Object for this identifier", - new Object[]{flowFile, HTTP_CONTEXT_ID, contextIdentifier}); + new Object[]{flowFile, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier}); return; } @@ -168,6 +179,7 @@ public class HandleHttpResponse extends AbstractProcessor { return; } + session.getProvenanceReporter().send(flowFile, HTTPUtils.getURI(flowFile.getAttributes()), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); getLogger().info("Successfully responded to HTTP Request for {} with status code {}", new Object[]{flowFile, statusCode}); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/HTTPUtils.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/HTTPUtils.java new file mode 100644 index 0000000000..937554d70e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/HTTPUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import java.util.Map; + +public class HTTPUtils { + + public static final String HTTP_REQUEST_URI = "http.request.uri"; + public static final String HTTP_REMOTE_HOST = "http.remote.host"; + public static final String HTTP_LOCAL_NAME = "http.local.name"; + public static final String HTTP_PORT = "http.server.port"; + public static final String HTTP_SSL_CERT = "http.subject.dn"; + public static final String HTTP_CONTEXT_ID = "http.context.identifier"; + + public static String getURI(Map map) { + final String client = map.get(HTTP_REMOTE_HOST); + final String server = map.get(HTTP_LOCAL_NAME); + final String port = map.get(HTTP_PORT); + final String uri = map.get(HTTP_REQUEST_URI); + if(map.get(HTTP_SSL_CERT) == null) { + return "http://" + client + "@" + server + ":" + port + uri; + } else { + return "https://" + client + "@" + server + ":" + port + uri; + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java index 84fb26d88f..bdf1cdce35 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java @@ -39,6 +39,8 @@ import javax.servlet.http.HttpServletResponse; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.http.HttpContextMap; import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processors.standard.util.HTTPUtils; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -63,7 +65,12 @@ public class TestHandleHttpResponse { runner.setProperty("no-valid-attr", "${no-valid-attr}"); final Map attributes = new HashMap<>(); - attributes.put(HandleHttpResponse.HTTP_CONTEXT_ID, "my-id"); + attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id"); + attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test"); + attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server"); + attributes.put(HTTPUtils.HTTP_PORT, "8443"); + attributes.put(HTTPUtils.HTTP_REMOTE_HOST, "client"); + attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN"); attributes.put("my-attr", "hello"); attributes.put("status.code", "201"); @@ -72,6 +79,9 @@ public class TestHandleHttpResponse { runner.run(); runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1); + assertTrue(runner.getProvenanceEvents().size() == 1); + assertEquals(ProvenanceEventType.SEND, runner.getProvenanceEvents().get(0).getEventType()); + assertEquals("https://client@server:8443/test", runner.getProvenanceEvents().get(0).getTransitUri()); assertEquals("hello", contextMap.baos.toString()); assertEquals("hello", contextMap.headersSent.get("my-attr")); @@ -94,7 +104,7 @@ public class TestHandleHttpResponse { runner.setProperty("no-valid-attr", "${no-valid-attr}"); final Map attributes = new HashMap<>(); - attributes.put(HandleHttpResponse.HTTP_CONTEXT_ID, "my-id"); + attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id"); attributes.put("my-attr", "hello"); attributes.put("status.code", "201");