NIFI-221: Fixed bugs and pulled in changes from develop

This commit is contained in:
Mark Payne 2015-02-26 09:05:28 -05:00
parent 739f0c25e2
commit 2d48323878
17 changed files with 1183 additions and 0 deletions

View File

@ -155,6 +155,11 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-nar</artifactId> <artifactId>nifi-kafka-nar</artifactId>
<type>nar</type> <type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map-nar</artifactId>
<type>nar</type>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -47,6 +47,10 @@
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId> <artifactId>nifi-distributed-cache-client-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-io</groupId> <groupId>commons-io</groupId>

View File

@ -0,0 +1,519 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.security.cert.X509Certificate;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.http.HttpContextMap;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import com.sun.jersey.api.client.ClientResponse.Status;
@Tags({"http", "https", "request", "listen", "ingress", "web service"})
@CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. For each request, creates a FlowFile and transfers to 'success'. This Processor is designed to be used in conjunction with the HandleHttpResponse Processor in order to create a Web Service")
public class HandleHttpRequest extends AbstractProcessor {
public static final String HTTP_CONTEXT_ID = "http.context.identifier";
// Allowable values for client auth
public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "Processor will not authenticate clients. Anyone can communicate with this Processor anonymously");
public static final AllowableValue CLIENT_WANT = new AllowableValue("Want Authentication", "Processor will try to verify the client but if unable to verify will allow the client to communicate anonymously");
public static final AllowableValue CLIENT_NEED = new AllowableValue("Need Authentication", "Processor will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore specified in the SSL Context Service");
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("Listening Port")
.description("The Port to listen on for incoming HTTP requests")
.required(true)
.addValidator(StandardValidators.createLongValidator(0L, 65535L, true))
.expressionLanguageSupported(false)
.defaultValue("80")
.build();
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The Hostname to bind to. If not specified, will bind to all hosts")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor HTTP_CONTEXT_MAP = new PropertyDescriptor.Builder()
.name("HTTP Context Map")
.description("The HTTP Context Map Controller Service to use for caching the HTTP Request Information")
.required(true)
.identifiesControllerService(HttpContextMap.class)
.build();
public static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service to use in order to secure the server. If specified, the server will accept only HTTPS requests; otherwise, the server will accept only HTTP requests")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor PATH_REGEX = new PropertyDescriptor.Builder()
.name("Allowed Paths")
.description("A Regular Expression that specifies the valid HTTP Paths that are allowed in the incoming URL Requests. If this value is specified and the path of the HTTP Requests does not match this Regular Expression, the Processor will respond with a 404: NotFound")
.required(false)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor ALLOW_GET = new PropertyDescriptor.Builder()
.name("Allow GET")
.description("Allow HTTP GET Method")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor ALLOW_POST = new PropertyDescriptor.Builder()
.name("Allow POST")
.description("Allow HTTP POST Method")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor ALLOW_PUT = new PropertyDescriptor.Builder()
.name("Allow PUT")
.description("Allow HTTP PUT Method")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor ALLOW_DELETE = new PropertyDescriptor.Builder()
.name("Allow DELETE")
.description("Allow HTTP DELETE Method")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor ALLOW_HEAD = new PropertyDescriptor.Builder()
.name("Allow HEAD")
.description("Allow HTTP HEAD Method")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor ALLOW_OPTIONS = new PropertyDescriptor.Builder()
.name("Allow OPTIONS")
.description("Allow HTTP OPTIONS Method")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor ADDITIONAL_METHODS = new PropertyDescriptor.Builder()
.name("Additional HTTP Methods")
.description("A comma-separated list of non-standard HTTP Methods that should be allowed")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("Client Authentication")
.description("Specifies whether or not the Processor should authenticate clients. This value is ignored if the <SSL Context Service> Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.")
.required(true)
.allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED)
.defaultValue(CLIENT_NONE.getValue())
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All content that is received is routed to the 'success' relationship")
.build();
private volatile Server server;
private final BlockingQueue<HttpRequestContainer> containerQueue = new LinkedBlockingQueue<>(50);
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(PORT);
descriptors.add(HOSTNAME);
descriptors.add(SSL_CONTEXT);
descriptors.add(HTTP_CONTEXT_MAP);
descriptors.add(PATH_REGEX);
descriptors.add(ALLOW_GET);
descriptors.add(ALLOW_POST);
descriptors.add(ALLOW_PUT);
descriptors.add(ALLOW_DELETE);
descriptors.add(ALLOW_HEAD);
descriptors.add(ALLOW_OPTIONS);
descriptors.add(ADDITIONAL_METHODS);
return descriptors;
}
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@OnScheduled
public void initializeServer(final ProcessContext context) throws Exception {
final String host = context.getProperty(HOSTNAME).getValue();
final int port = context.getProperty(PORT).asInteger();
final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
final boolean need;
final boolean want;
if ( CLIENT_NEED.equals(clientAuthValue) ) {
need = true;
want = false;
} else if ( CLIENT_WANT.equals(clientAuthValue) ) {
need = false;
want = true;
} else {
need = false;
want = false;
}
final SslContextFactory sslFactory = (sslService == null) ? null : createSslFactory(sslService, need, want);
final Server server = new Server(port);
// create the http configuration
final HttpConfiguration httpConfiguration = new HttpConfiguration();
if ( sslFactory == null ) {
// create the connector
final ServerConnector http = new ServerConnector(server, new HttpConnectionFactory(httpConfiguration));
// set host and port
if (StringUtils.isNotBlank(host)) {
http.setHost(host);
}
http.setPort(port);
// add this connector
server.setConnectors(new Connector[] {http});
} else {
// add some secure config
final HttpConfiguration httpsConfiguration = new HttpConfiguration(httpConfiguration);
httpsConfiguration.setSecureScheme("https");
httpsConfiguration.setSecurePort(port);
httpsConfiguration.addCustomizer(new SecureRequestCustomizer());
// build the connector
final ServerConnector https = new ServerConnector(server,
new SslConnectionFactory(sslFactory, "http/1.1"),
new HttpConnectionFactory(httpsConfiguration));
// set host and port
if (StringUtils.isNotBlank(host)) {
https.setHost(host);
}
https.setPort(port);
// add this connector
server.setConnectors(new Connector[] {https});
}
final Set<String> allowedMethods = new HashSet<>();
if ( context.getProperty(ALLOW_GET).asBoolean() ) {
allowedMethods.add("GET");
}
if ( context.getProperty(ALLOW_POST).asBoolean() ) {
allowedMethods.add("POST");
}
if ( context.getProperty(ALLOW_PUT).asBoolean() ) {
allowedMethods.add("PUT");
}
if ( context.getProperty(ALLOW_DELETE).asBoolean() ) {
allowedMethods.add("DELETE");
}
if ( context.getProperty(ALLOW_HEAD).asBoolean() ) {
allowedMethods.add("HEAD");
}
if ( context.getProperty(ALLOW_OPTIONS).asBoolean() ) {
allowedMethods.add("OPTIONS");
}
final String additionalMethods = context.getProperty(ADDITIONAL_METHODS).getValue();
if ( additionalMethods != null ) {
for ( final String additionalMethod : additionalMethods.split(",") ) {
final String trimmed = additionalMethod.trim();
if ( !trimmed.isEmpty() ) {
allowedMethods.add(trimmed.toUpperCase());
}
}
}
final String pathRegex = context.getProperty(PATH_REGEX).getValue();
final Pattern pathPattern = (pathRegex == null) ? null : Pattern.compile(pathRegex);
server.setHandler(new AbstractHandler() {
@Override
public void handle(final String target, final Request baseRequest, final HttpServletRequest request,
final HttpServletResponse response) throws IOException, ServletException {
final String requestUri = request.getRequestURI();
if ( !allowedMethods.contains(request.getMethod().toUpperCase()) ) {
getLogger().info("Sending back METHOD_NOT_ALLOWED response to {}; method was {}; request URI was {}",
new Object[] {request.getRemoteAddr(), request.getMethod(), requestUri});
response.sendError(Status.METHOD_NOT_ALLOWED.getStatusCode());
return;
}
if ( pathPattern != null ) {
final URI uri;
try {
uri = new URI(requestUri);
} catch (final URISyntaxException e) {
throw new ServletException(e);
}
if ( !pathPattern.matcher(uri.getPath()).matches() ) {
response.sendError(Status.NOT_FOUND.getStatusCode());
getLogger().info("Sending back NOT_FOUND response to {}; request was {} {}",
new Object[] {request.getRemoteAddr(), request.getMethod(), requestUri});
return;
}
}
// If destination queues full, send back a 503: Service Unavailable.
if ( context.getAvailableRelationships().isEmpty() ) {
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return;
}
// Right now, that information, though, is only in the ProcessSession, not the ProcessContext,
// so it is not known to us. Should see if it can be added to the ProcessContext.
final AsyncContext async = baseRequest.startAsync();
final boolean added = containerQueue.offer(new HttpRequestContainer(request, response, async));
if ( added ) {
getLogger().debug("Added Http Request to queue for {} {} from {}", new Object[] {request.getMethod(), requestUri, request.getRemoteAddr()});
} else {
getLogger().info("Sending back a SERVICE_UNAVAILABLE response to {}; request was {} {}",
new Object[] {request.getRemoteAddr(), request.getMethod(), request.getRemoteAddr()});
response.sendError(Status.SERVICE_UNAVAILABLE.getStatusCode());
response.flushBuffer();
async.complete();
}
}
});
this.server = server;
server.start();
getLogger().info("Server started and listening on port " + getPort());
}
protected int getPort() {
for ( final Connector connector : server.getConnectors() ) {
if ( connector instanceof ServerConnector ) {
return ((ServerConnector) connector).getPort();
}
}
throw new IllegalStateException("Server is not listening on any ports");
}
protected int getRequestQueueSize() {
return containerQueue.size();
}
private SslContextFactory createSslFactory(final SSLContextService sslService, final boolean needClientAuth, final boolean wantClientAuth) {
final SslContextFactory sslFactory = new SslContextFactory();
sslFactory.setNeedClientAuth(needClientAuth);
sslFactory.setWantClientAuth(wantClientAuth);
if ( sslService.isKeyStoreConfigured() ) {
sslFactory.setKeyStorePath(sslService.getKeyStoreFile());
sslFactory.setKeyStorePassword(sslService.getKeyStorePassword());
sslFactory.setKeyStoreType(sslService.getKeyStoreType());
}
if ( sslService.isTrustStoreConfigured() ) {
sslFactory.setTrustStorePath(sslService.getTrustStoreFile());
sslFactory.setTrustStorePassword(sslService.getTrustStorePassword());
sslFactory.setTrustStoreType(sslService.getTrustStoreType());
}
return sslFactory;
}
@OnStopped
public void shutdown() throws Exception {
if ( server != null ) {
getLogger().debug("Shutting down server");
server.stop();
server.join();
getLogger().info("Shut down {}", new Object[] {server});
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final HttpRequestContainer container = containerQueue.poll();
if ( container == null ) {
return;
}
final long start = System.nanoTime();
final HttpServletRequest request = container.getRequest();
FlowFile flowFile = session.create();
try {
flowFile = session.importFrom(request.getInputStream(), flowFile);
} catch (final IOException e) {
getLogger().error("Failed to receive content from HTTP Request from {} due to {}", new Object[] {request.getRemoteAddr(), e});
session.remove(flowFile);
return;
}
final String contextIdentifier = UUID.randomUUID().toString();
final Map<String, String> attributes = new HashMap<>();
putAttribute(attributes, HTTP_CONTEXT_ID, contextIdentifier);
putAttribute(attributes, "mime.type", request.getContentType());
putAttribute(attributes, "http.servlet.path", request.getServletPath());
putAttribute(attributes, "http.context.path", request.getContextPath());
putAttribute(attributes, "http.method", request.getMethod());
putAttribute(attributes, "http.query.string", request.getQueryString());
putAttribute(attributes, "http.remote.host", request.getRemoteHost());
putAttribute(attributes, "http.remote.addr", request.getRemoteAddr());
putAttribute(attributes, "http.remote.user", request.getRemoteUser());
putAttribute(attributes, "http.request.uri", request.getRequestURI());
putAttribute(attributes, "http.auth.type", request.getAuthType());
final Enumeration<String> headerNames = request.getHeaderNames();
while ( headerNames.hasMoreElements() ) {
final String headerName = headerNames.nextElement();
final String headerValue = request.getHeader(headerName);
putAttribute(attributes, "http.headers." + headerName, headerValue);
}
final Principal principal = request.getUserPrincipal();
if ( principal != null ) {
putAttribute(attributes, "http.principal.name", principal.getName());
}
final X509Certificate certs[] = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
final String subjectDn;
if ( certs != null && certs.length > 0 ) {
final X509Certificate cert = certs[0];
subjectDn = cert.getSubjectDN().getName();
final String issuerDn = cert.getIssuerDN().getName();
putAttribute(attributes, "http.subject.dn", subjectDn);
putAttribute(attributes, "http.issuer.dn", issuerDn);
} else {
subjectDn = null;
}
flowFile = session.putAllAttributes(flowFile, attributes);
final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class);
final boolean registered = contextMap.register(contextIdentifier, request, container.getResponse(), container.getContext());
if ( !registered ) {
getLogger().warn("Received request from {} but could not process it because too many requests are already outstanding; responding with SERVICE_UNAVAILABLE", new Object[] {request.getRemoteAddr()});
try {
container.getResponse().setStatus(Status.SERVICE_UNAVAILABLE.getStatusCode());
container.getResponse().flushBuffer();
container.getContext().complete();
} catch (final Exception e) {
getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message to {} due to {}", new Object[] {request.getRemoteAddr(), e});
}
return;
}
final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
session.getProvenanceReporter().receive(flowFile, request.getRequestURI(), "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Transferring {} to 'success'; received from {}", new Object[] {flowFile, request.getRemoteAddr()});
}
private void putAttribute(final Map<String, String> map, final String key, final String value) {
if ( value == null ) {
return;
}
map.put(key, value);
}
private static class HttpRequestContainer {
private final HttpServletRequest request;
private final HttpServletResponse response;
private final AsyncContext context;
public HttpRequestContainer(final HttpServletRequest request, final HttpServletResponse response, final AsyncContext async) {
this.request = request;
this.response = response;
this.context = async;
}
public HttpServletRequest getRequest() {
return request;
}
public HttpServletResponse getResponse() {
return response;
}
public AsyncContext getContext() {
return context;
}
}
}

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.http.HttpContextMap;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@Tags({"http", "https", "response", "egress", "web service"})
@CapabilityDescription("Sends an HTTP Response to the Requestor that generated a FlowFile. This Processor is designed to be used in conjunction with the HandleHttpRequest in order to create a web service.")
public class HandleHttpResponse extends AbstractProcessor {
public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+");
public static final String HTTP_CONTEXT_ID = "http.context.identifier";
public static final PropertyDescriptor STATUS_CODE = new PropertyDescriptor.Builder()
.name("HTTP Status Code")
.description("The HTTP Status Code to use when responding to the HTTP Request. See Section 10 of RFC 2616 for more information.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor HTTP_CONTEXT_MAP = new PropertyDescriptor.Builder()
.name("HTTP Context Map")
.description("The HTTP Context Map Controller Service to use for caching the HTTP Request Information")
.required(true)
.identifiesControllerService(HttpContextMap.class)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles will be routed to this Relationship after the response has been successfully sent to the requestor")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles will be routed to this Relationship if the Processor is unable to respond to the requestor. This may happen, for instance, if the connection times out or if NiFi is restarted before responding to the HTTP Request.")
.build();
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(STATUS_CODE);
properties.add(HTTP_CONTEXT_MAP);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
return relationships;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value to send for the '" + propertyDescriptorName + "' HTTP Header")
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamic(true)
.expressionLanguageSupported(true)
.build();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
final String contextIdentifier = flowFile.getAttribute(HTTP_CONTEXT_ID);
if ( contextIdentifier == null ) {
session.transfer(flowFile, REL_FAILURE);
getLogger().warn("Failed to respond to HTTP request for {} because FlowFile did not have an "
+ "'http.context.identifier' attribute", new Object[] {flowFile});
return;
}
final String statusCodeValue = context.getProperty(STATUS_CODE).evaluateAttributeExpressions(flowFile).getValue();
if ( !isNumber(statusCodeValue) ) {
session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to response to HTTP request for {} because status code was '{}', which is not a valid number", new Object[] {flowFile, statusCodeValue});
}
final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class);
final HttpServletResponse response = contextMap.getResponse(contextIdentifier);
if ( response == null ) {
session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {} because FlowFile had an '{}' "
+ "attribute of {} but could not find an HTTP Response Object for this identifier", new Object[] {flowFile, HTTP_CONTEXT_ID, contextIdentifier});
return;
}
final int statusCode = Integer.parseInt(statusCodeValue);
response.setStatus(statusCode);
for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) {
final PropertyDescriptor descriptor = entry.getKey();
if ( descriptor.isDynamic() ) {
final String headerName = descriptor.getName();
final String headerValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();
if ( !headerValue.trim().isEmpty() ) {
response.setHeader(headerName, headerValue);
}
}
}
try {
session.exportTo(flowFile, response.getOutputStream());
response.flushBuffer();
} catch (final IOException ioe) {
session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[] {flowFile, ioe});
return;
}
try {
contextMap.complete(contextIdentifier);
} catch (final IllegalStateException ise) {
getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[] {flowFile, ise});
session.transfer(flowFile, REL_FAILURE);
return;
}
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully responded to HTTP Request for {} with status code {}", new Object[] {flowFile, statusCode});
}
private static boolean isNumber(final String value) {
if ( value.length() == 0 ) {
return false;
}
for (int i=0; i < value.length(); i++) {
if ( !Character.isDigit(value.charAt(i)) ) {
return false;
}
}
return true;
}
}

View File

@ -28,6 +28,8 @@ org.apache.nifi.processors.standard.GetFile
org.apache.nifi.processors.standard.GetFTP org.apache.nifi.processors.standard.GetFTP
org.apache.nifi.processors.standard.GetHTTP org.apache.nifi.processors.standard.GetHTTP
org.apache.nifi.processors.standard.GetSFTP org.apache.nifi.processors.standard.GetSFTP
org.apache.nifi.processors.standard.HandleHttpRequest
org.apache.nifi.processors.standard.HandleHttpResponse
org.apache.nifi.processors.standard.HashAttribute org.apache.nifi.processors.standard.HashAttribute
org.apache.nifi.processors.standard.HashContent org.apache.nifi.processors.standard.HashContent
org.apache.nifi.processors.standard.IdentifyMimeType org.apache.nifi.processors.standard.IdentifyMimeType

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-http-context-map-api</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.http;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.controller.ControllerService;
/**
* <p>
* An interface that provides the capability of receiving an HTTP servlet request in one component
* and responding to that request in another component.
* </p>
*
* <p>
* The intended flow is for the component receiving the HTTP request to register the request, response,
* and AsyncContext with a particular identifier via the
* {@link #register(String, HttpServletRequest, HttpServletResponse, AsyncContext)}
* method. Another component is then able to obtain the response
* by providing that identifier to the {@link #getResponse(String)} method. After writing to the
* HttpServletResponse, the transaction is to then be completed via the {@link #complete(String)} method.
* </p>
*/
public interface HttpContextMap extends ControllerService {
/**
* Registers an HttpServletRequest, HttpServletResponse, and the AsyncContext for a given identifier
*
* @param identifier
* @param request
* @param response
* @param context
*
* @return true if register is successful, false if the context map is too full because too many requests have already been received and not processed
*
* @throws IllegalStateException if the identifier is already registered
*/
boolean register(String identifier, HttpServletRequest request, HttpServletResponse response, AsyncContext context);
/**
* Retrieves the HttpServletResponse for the given identifier, if it exists
* @param identifier
* @return the HttpServletResponse for the given identifier, or {@code null} if it does not exist
*/
HttpServletResponse getResponse(String identifier);
/**
* Marks the HTTP request/response for the given identifier as complete
* @param identifier
*
* @throws IllegalStateException if the identifier is not registered to a valid AsyncContext
*/
void complete(String identifier);
}

View File

@ -0,0 +1,37 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-http-context-map-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,44 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-http-context-map</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map-api</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.http;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.util.StandardValidators;
public class StandardHttpContextMap extends AbstractControllerService implements HttpContextMap {
public static final PropertyDescriptor MAX_OUTSTANDING_REQUESTS = new PropertyDescriptor.Builder()
.name("Maximum Outstanding Requests")
.description("The maximum number of HTTP requests that can be outstanding at any one time. Any attempt to register an additional HTTP Request will cause an error")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("5000")
.build();
public static final PropertyDescriptor REQUEST_EXPIRATION = new PropertyDescriptor.Builder()
.name("Request Expiration")
.description("Specifies how long an HTTP Request should be left unanswered before being evicted from the cache and being responded to with a Service Unavailable status code")
.required(true)
.expressionLanguageSupported(false)
.defaultValue("1 min")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
private final ConcurrentMap<String, Wrapper> wrapperMap = new ConcurrentHashMap<>();
private volatile int maxSize = 5000;
private volatile long maxRequestNanos;
private volatile ScheduledExecutorService executor;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(2);
properties.add(MAX_OUTSTANDING_REQUESTS);
properties.add(REQUEST_EXPIRATION);
return properties;
}
@OnEnabled
public void onConfigured(final ConfigurationContext context) {
maxSize = context.getProperty(MAX_OUTSTANDING_REQUESTS).asInteger();
executor = Executors.newSingleThreadScheduledExecutor();
maxRequestNanos = context.getProperty(REQUEST_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS);
final long scheduleNanos = maxRequestNanos / 2;
executor.scheduleWithFixedDelay(new CleanupExpiredRequests(), scheduleNanos, scheduleNanos, TimeUnit.NANOSECONDS);
}
@OnDisabled
public void cleanup() {
if ( executor != null ) {
executor.shutdown();
}
}
@Override
public boolean register(final String identifier, final HttpServletRequest request, final HttpServletResponse response, final AsyncContext context) {
// fail if there are too many already. Maybe add a configuration property for how many
// outstanding, with a default of say 5000
if ( wrapperMap.size() >= maxSize ) {
return false;
}
final Wrapper wrapper = new Wrapper(request, response, context);
final Wrapper existing = wrapperMap.putIfAbsent(identifier, wrapper);
if ( existing != null ) {
throw new IllegalStateException("HTTP Request already registered with identifier " + identifier);
}
return true;
}
@Override
public HttpServletResponse getResponse(final String identifier) {
final Wrapper wrapper = wrapperMap.get(identifier);
if ( wrapper == null ) {
return null;
}
return wrapper.getResponse();
}
@Override
public void complete(final String identifier) {
final Wrapper wrapper = wrapperMap.remove(identifier);
if ( wrapper == null ) {
throw new IllegalStateException("No HTTP Request registered with identifier " + identifier);
}
wrapper.getAsync().complete();
}
private static class Wrapper {
@SuppressWarnings("unused")
private final HttpServletRequest request;
private final HttpServletResponse response;
private final AsyncContext async;
private final long nanoTimeAdded = System.nanoTime();
public Wrapper(final HttpServletRequest request, final HttpServletResponse response, final AsyncContext async) {
this.request = request;
this.response = response;
this.async = async;
}
public HttpServletResponse getResponse() {
return response;
}
public AsyncContext getAsync() {
return async;
}
public long getNanoTimeAdded() {
return nanoTimeAdded;
}
}
private class CleanupExpiredRequests implements Runnable {
@Override
public void run() {
final long now = System.nanoTime();
final long threshold = now - maxRequestNanos;
final Iterator<Map.Entry<String, Wrapper>> itr = wrapperMap.entrySet().iterator();
while ( itr.hasNext() ) {
final Map.Entry<String, Wrapper> entry = itr.next();
if ( entry.getValue().getNanoTimeAdded() < threshold ) {
itr.remove();
// send SERVICE_UNAVAILABLE
try {
final AsyncContext async = entry.getValue().getAsync();
((HttpServletResponse) async.getResponse()).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
async.complete();
} catch (final IOException ioe) {
// we are trying to indicate that we are unavailable. If we have an IOException and cannot respond,
// then so be it. Nothing to really do here.
}
}
}
}
}
}

View File

@ -0,0 +1,67 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>StandardHttpContextMap</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Description:</h2>
<p>
This is the standard implementation of the SSL Context Map. This service is used to provide
coordination between
<a href="../org.apache.nifi.processors.standard.HandleHttpRequest/index.html">HandleHttpRequest</a>
and
<a href="../org.apache.nifi.processors.standard.HandleHttpResponse/index.html">HandleHttpResponse</a>
Processors.
</p>
<!-- Service Documentation ================================================== -->
<h2>Configuring the HTTP Context Map:</h2>
<p>
The <code>controller-services.xml</code> file is located in the NiFi <code>conf</code>
directory. The user may set up any number of controller services within this file.
</p>
<p>
This controller service exposes a single property named <code>Maximum Outstanding Requests</code>.
This property determines the maximum number of HTTP requests that can be outstanding at any one time.
Any attempt to register an additional HTTP Request will cause an error. The default value is 5000.
Below is an example of the template for a StandardHttpContextMap controller service.
</p>
<pre>
&lt;?xml version="1.0" encoding="UTF-8" ?&gt;
&lt;services&gt;
&lt;service&gt;
&lt;identifier&gt;http-context-map&lt;/identifier&gt;
&lt;class&gt;org.apache.nifi.http.StandardHttpContextMap&lt;/class&gt;
&lt;property name="Maximum Outstanding Requests"&gt;5000&lt;/property&gt;
&lt;/service&gt;
&lt;/services&gt;
</pre>
<p>
<strong>See Also:</strong><br />
<a href="../org.apache.nifi.processors.standard.HandleHttpRequest/index.html">HandleHttpRequest</a><br />
<a href="../org.apache.nifi.processors.standard.HandleHttpResponse/index.html">HandleHttpResponse</a><br />
</p>
</body>
</html>

View File

@ -0,0 +1,31 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-http-context-map-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-http-context-map</module>
<module>nifi-http-context-map-nar</module>
</modules>
</project>

View File

@ -37,5 +37,10 @@
<artifactId>nifi-load-distribution-service-api</artifactId> <artifactId>nifi-load-distribution-service-api</artifactId>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map-api</artifactId>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -26,8 +26,10 @@
<module>nifi-distributed-cache-client-service-api</module> <module>nifi-distributed-cache-client-service-api</module>
<module>nifi-distributed-cache-services-bundle</module> <module>nifi-distributed-cache-services-bundle</module>
<module>nifi-load-distribution-service-api</module> <module>nifi-load-distribution-service-api</module>
<module>nifi-http-context-map-api</module>
<module>nifi-ssl-context-bundle</module> <module>nifi-ssl-context-bundle</module>
<module>nifi-ssl-context-service-api</module> <module>nifi-ssl-context-service-api</module>
<module>nifi-http-context-map-bundle</module>
<module>nifi-standard-services-api-nar</module> <module>nifi-standard-services-api-nar</module>
</modules> </modules>
</project> </project>

View File

@ -59,6 +59,12 @@
<artifactId>nifi-load-distribution-service-api</artifactId> <artifactId>nifi-load-distribution-service-api</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version> <version>0.0.2-incubating-SNAPSHOT</version>
<scope>provided</scope> <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map-api</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
@ -74,6 +80,11 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId> <artifactId>nifi-ssl-context-service</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version> <version>0.0.2-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>

View File

@ -752,6 +752,12 @@
<artifactId>nifi-kafka-nar</artifactId> <artifactId>nifi-kafka-nar</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version> <version>0.0.2-incubating-SNAPSHOT</version>
<type>nar</type> <type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map-nar</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>