NIFI-1816 Added provenance event SEND in HandleHttpResponse

This closes #389
This commit is contained in:
Pierre Villard 2016-04-28 18:59:42 +02:00 committed by jpercivall
parent 993d3cd78f
commit 9064b97631
4 changed files with 86 additions and 20 deletions

View File

@ -62,6 +62,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
@ -80,20 +81,22 @@ import com.sun.jersey.api.client.ClientResponse.Status;
@CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. For each request, creates a FlowFile and transfers to 'success'. "
+ "This Processor is designed to be used in conjunction with the HandleHttpResponse Processor in order to create a Web Service")
@WritesAttributes({
@WritesAttribute(attribute = "http.context.identifier", description = "An identifier that allows the HandleHttpRequest and HandleHttpResponse "
@WritesAttribute(attribute = HTTPUtils.HTTP_CONTEXT_ID, description = "An identifier that allows the HandleHttpRequest and HandleHttpResponse "
+ "to coordinate which FlowFile belongs to which HTTP Request/Response."),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type of the data, according to the HTTP Header \"Content-Type\""),
@WritesAttribute(attribute = "http.servlet.path", description = "The part of the request URL that is considered the Servlet Path"),
@WritesAttribute(attribute = "http.context.path", description = "The part of the request URL that is considered to be the Context Path"),
@WritesAttribute(attribute = "http.method", description = "The HTTP Method that was used for the request, such as GET or POST"),
@WritesAttribute(attribute = HTTPUtils.HTTP_LOCAL_NAME, description = "IP address/hostname of the server"),
@WritesAttribute(attribute = HTTPUtils.HTTP_PORT, description = "Listening port of the server"),
@WritesAttribute(attribute = "http.query.string", description = "The query string portion of hte Request URL"),
@WritesAttribute(attribute = "http.remote.host", description = "The hostname of the requestor"),
@WritesAttribute(attribute = HTTPUtils.HTTP_REMOTE_HOST, description = "The hostname of the requestor"),
@WritesAttribute(attribute = "http.remote.addr", description = "The hostname:port combination of the requestor"),
@WritesAttribute(attribute = "http.remote.user", description = "The username of the requestor"),
@WritesAttribute(attribute = "http.request.uri", description = "The full Request URL"),
@WritesAttribute(attribute = HTTPUtils.HTTP_REQUEST_URI, description = "The full Request URL"),
@WritesAttribute(attribute = "http.auth.type", description = "The type of HTTP Authorization used"),
@WritesAttribute(attribute = "http.principal.name", description = "The name of the authenticated user making the request"),
@WritesAttribute(attribute = "http.subject.dn", description = "The Distinguished Name of the requestor. This value will not be populated "
@WritesAttribute(attribute = HTTPUtils.HTTP_SSL_CERT, description = "The Distinguished Name of the requestor. This value will not be populated "
+ "unless the Processor is configured to use an SSLContext Service"),
@WritesAttribute(attribute = "http.issuer.dn", description = "The Distinguished Name of the entity that issued the Subject's certificate. "
+ "This value will not be populated unless the Processor is configured to use an SSLContext Service"),
@ -104,7 +107,6 @@ import com.sun.jersey.api.client.ClientResponse.Status;
classNames = {"org.apache.nifi.http.StandardHttpContextMap", "org.apache.nifi.ssl.StandardSSLContextService"})
public class HandleHttpRequest extends AbstractProcessor {
public static final String HTTP_CONTEXT_ID = "http.context.identifier";
private static final Pattern URL_QUERY_PARAM_DELIMITER = Pattern.compile("&");
// Allowable values for client auth
@ -493,20 +495,20 @@ public class HandleHttpRequest extends AbstractProcessor {
final String contextIdentifier = UUID.randomUUID().toString();
final Map<String, String> attributes = new HashMap<>();
try {
putAttribute(attributes, HTTP_CONTEXT_ID, contextIdentifier);
putAttribute(attributes, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier);
putAttribute(attributes, "mime.type", request.getContentType());
putAttribute(attributes, "http.servlet.path", request.getServletPath());
putAttribute(attributes, "http.context.path", request.getContextPath());
putAttribute(attributes, "http.method", request.getMethod());
putAttribute(attributes, "http.local.addr", request.getLocalAddr());
putAttribute(attributes, "http.local.name", request.getLocalName());
putAttribute(attributes, HTTPUtils.HTTP_LOCAL_NAME, request.getLocalName());
if (request.getQueryString() != null) {
putAttribute(attributes, "http.query.string", URLDecoder.decode(request.getQueryString(), charset));
}
putAttribute(attributes, "http.remote.host", request.getRemoteHost());
putAttribute(attributes, HTTPUtils.HTTP_REMOTE_HOST, request.getRemoteHost());
putAttribute(attributes, "http.remote.addr", request.getRemoteAddr());
putAttribute(attributes, "http.remote.user", request.getRemoteUser());
putAttribute(attributes, "http.request.uri", request.getRequestURI());
putAttribute(attributes, HTTPUtils.HTTP_REQUEST_URI, request.getRequestURI());
putAttribute(attributes, "http.request.url", request.getRequestURL().toString());
putAttribute(attributes, "http.auth.type", request.getAuthType());
@ -517,7 +519,7 @@ public class HandleHttpRequest extends AbstractProcessor {
putAttribute(attributes, "http.character.encoding", request.getCharacterEncoding());
putAttribute(attributes, "http.locale", request.getLocale());
putAttribute(attributes, "http.server.name", request.getServerName());
putAttribute(attributes, "http.server.port", request.getServerPort());
putAttribute(attributes, HTTPUtils.HTTP_PORT, request.getServerPort());
final Enumeration<String> paramEnumeration = request.getParameterNames();
while (paramEnumeration.hasMoreElements()) {
@ -585,7 +587,7 @@ public class HandleHttpRequest extends AbstractProcessor {
subjectDn = cert.getSubjectDN().getName();
final String issuerDn = cert.getIssuerDN().getName();
putAttribute(attributes, "http.subject.dn", subjectDn);
putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn);
putAttribute(attributes, "http.issuer.dn", issuerDn);
} else {
subjectDn = null;
@ -613,7 +615,7 @@ public class HandleHttpRequest extends AbstractProcessor {
}
final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
session.getProvenanceReporter().receive(flowFile, request.getRequestURI(), "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis);
session.getProvenanceReporter().receive(flowFile, HTTPUtils.getURI(attributes), "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Transferring {} to 'success'; received from {}", new Object[]{flowFile, request.getRemoteAddr()});
}

View File

@ -21,6 +21,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.servlet.http.HttpServletResponse;
@ -29,6 +30,7 @@ import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@ -41,19 +43,26 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.util.StopWatch;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"http", "https", "response", "egress", "web service"})
@CapabilityDescription("Sends an HTTP Response to the Requestor that generated a FlowFile. This Processor is designed to be used in conjunction with "
+ "the HandleHttpRequest in order to create a web service.")
@DynamicProperty(name = "An HTTP header name", value = "An HTTP header value", description = "These HTTPHeaders are set in the HTTP Response")
@ReadsAttribute(attribute = "http.context.identifier", description = "The value of this attribute is used to lookup the HTTP Response so that the "
+ "proper message can be sent back to the requestor. If this attribute is missing, the FlowFile will be routed to 'failure.'")
@ReadsAttributes({
@ReadsAttribute(attribute = HTTPUtils.HTTP_CONTEXT_ID, description = "The value of this attribute is used to lookup the HTTP Response so that the "
+ "proper message can be sent back to the requestor. If this attribute is missing, the FlowFile will be routed to 'failure.'"),
@ReadsAttribute(attribute = HTTPUtils.HTTP_REQUEST_URI, description = "Value of the URI requested by the client. Used for provenance event."),
@ReadsAttribute(attribute = HTTPUtils.HTTP_REMOTE_HOST, description = "IP address of the client. Used for provenance event."),
@ReadsAttribute(attribute = HTTPUtils.HTTP_LOCAL_NAME, description = "IP address/hostname of the server. Used for provenance event."),
@ReadsAttribute(attribute = HTTPUtils.HTTP_PORT, description = "Listening port of the server. Used for provenance event."),
@ReadsAttribute(attribute = HTTPUtils.HTTP_SSL_CERT, description = "SSL distinguished name (if any). Used for provenance event.")})
@SeeAlso(value = {HandleHttpRequest.class}, classNames = {"org.apache.nifi.http.StandardHttpContextMap", "org.apache.nifi.ssl.StandardSSLContextService"})
public class HandleHttpResponse extends AbstractProcessor {
public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+");
public static final String HTTP_CONTEXT_ID = "http.context.identifier";
public static final PropertyDescriptor STATUS_CODE = new PropertyDescriptor.Builder()
.name("HTTP Status Code")
@ -113,10 +122,12 @@ public class HandleHttpResponse extends AbstractProcessor {
return;
}
final String contextIdentifier = flowFile.getAttribute(HTTP_CONTEXT_ID);
final StopWatch stopWatch = new StopWatch(true);
final String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
if (contextIdentifier == null) {
session.transfer(flowFile, REL_FAILURE);
getLogger().warn("Failed to respond to HTTP request for {} because FlowFile did not have an 'http.context.identifier' attribute",
getLogger().warn("Failed to respond to HTTP request for {} because FlowFile did not have an '" + HTTPUtils.HTTP_CONTEXT_ID + "' attribute",
new Object[]{flowFile});
return;
}
@ -132,7 +143,7 @@ public class HandleHttpResponse extends AbstractProcessor {
if (response == null) {
session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {} because FlowFile had an '{}' attribute of {} but could not find an HTTP Response Object for this identifier",
new Object[]{flowFile, HTTP_CONTEXT_ID, contextIdentifier});
new Object[]{flowFile, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier});
return;
}
@ -168,6 +179,7 @@ public class HandleHttpResponse extends AbstractProcessor {
return;
}
session.getProvenanceReporter().send(flowFile, HTTPUtils.getURI(flowFile.getAttributes()), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully responded to HTTP Request for {} with status code {}", new Object[]{flowFile, statusCode});
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import java.util.Map;
public class HTTPUtils {
public static final String HTTP_REQUEST_URI = "http.request.uri";
public static final String HTTP_REMOTE_HOST = "http.remote.host";
public static final String HTTP_LOCAL_NAME = "http.local.name";
public static final String HTTP_PORT = "http.server.port";
public static final String HTTP_SSL_CERT = "http.subject.dn";
public static final String HTTP_CONTEXT_ID = "http.context.identifier";
public static String getURI(Map<String, String> map) {
final String client = map.get(HTTP_REMOTE_HOST);
final String server = map.get(HTTP_LOCAL_NAME);
final String port = map.get(HTTP_PORT);
final String uri = map.get(HTTP_REQUEST_URI);
if(map.get(HTTP_SSL_CERT) == null) {
return "http://" + client + "@" + server + ":" + port + uri;
} else {
return "https://" + client + "@" + server + ":" + port + uri;
}
}
}

View File

@ -39,6 +39,8 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.http.HttpContextMap;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -63,7 +65,12 @@ public class TestHandleHttpResponse {
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HandleHttpResponse.HTTP_CONTEXT_ID, "my-id");
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test");
attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server");
attributes.put(HTTPUtils.HTTP_PORT, "8443");
attributes.put(HTTPUtils.HTTP_REMOTE_HOST, "client");
attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN");
attributes.put("my-attr", "hello");
attributes.put("status.code", "201");
@ -72,6 +79,9 @@ public class TestHandleHttpResponse {
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1);
assertTrue(runner.getProvenanceEvents().size() == 1);
assertEquals(ProvenanceEventType.SEND, runner.getProvenanceEvents().get(0).getEventType());
assertEquals("https://client@server:8443/test", runner.getProvenanceEvents().get(0).getTransitUri());
assertEquals("hello", contextMap.baos.toString());
assertEquals("hello", contextMap.headersSent.get("my-attr"));
@ -94,7 +104,7 @@ public class TestHandleHttpResponse {
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HandleHttpResponse.HTTP_CONTEXT_ID, "my-id");
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
attributes.put("my-attr", "hello");
attributes.put("status.code", "201");