NIFI-221: Finished initial implementation of http procs

This commit is contained in:
Mark Payne 2015-03-01 14:31:26 -05:00
parent c53b0f9d15
commit 7c99054183
3 changed files with 353 additions and 12 deletions

View File

@ -17,8 +17,10 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collections;
@ -73,6 +75,7 @@ import com.sun.jersey.api.client.ClientResponse.Status;
@CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. For each request, creates a FlowFile and transfers to 'success'. This Processor is designed to be used in conjunction with the HandleHttpResponse Processor in order to create a Web Service")
public class HandleHttpRequest extends AbstractProcessor {
public static final String HTTP_CONTEXT_ID = "http.context.identifier";
private static final Pattern URL_QUERY_PARAM_DELIMITER = Pattern.compile("&");
// Allowable values for client auth
public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication", "Processor will not authenticate clients. Anyone can communicate with this Processor anonymously");
@ -107,6 +110,13 @@ public class HandleHttpRequest extends AbstractProcessor {
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor URL_CHARACTER_SET = new PropertyDescriptor.Builder()
.name("URL Character Set")
.description("The character set to use for decoding URL parameters")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
public static final PropertyDescriptor PATH_REGEX = new PropertyDescriptor.Builder()
.name("Allowed Paths")
.description("A Regular Expression that specifies the valid HTTP Paths that are allowed in the incoming URL Requests. If this value is specified and the path of the HTTP Requests does not match this Regular Expression, the Processor will respond with a 404: NotFound")
@ -189,6 +199,7 @@ public class HandleHttpRequest extends AbstractProcessor {
descriptors.add(SSL_CONTEXT);
descriptors.add(HTTP_CONTEXT_MAP);
descriptors.add(PATH_REGEX);
descriptors.add(URL_CHARACTER_SET);
descriptors.add(ALLOW_GET);
descriptors.add(ALLOW_POST);
descriptors.add(ALLOW_PUT);
@ -360,7 +371,7 @@ public class HandleHttpRequest extends AbstractProcessor {
protected int getPort() {
for ( final Connector connector : server.getConnectors() ) {
if ( connector instanceof ServerConnector ) {
return ((ServerConnector) connector).getPort();
return ((ServerConnector) connector).getLocalPort();
}
}
@ -421,19 +432,50 @@ public class HandleHttpRequest extends AbstractProcessor {
return;
}
final String charset = context.getProperty(URL_CHARACTER_SET).getValue();
final String contextIdentifier = UUID.randomUUID().toString();
final Map<String, String> attributes = new HashMap<>();
putAttribute(attributes, HTTP_CONTEXT_ID, contextIdentifier);
putAttribute(attributes, "mime.type", request.getContentType());
putAttribute(attributes, "http.servlet.path", request.getServletPath());
putAttribute(attributes, "http.context.path", request.getContextPath());
putAttribute(attributes, "http.method", request.getMethod());
putAttribute(attributes, "http.query.string", request.getQueryString());
putAttribute(attributes, "http.remote.host", request.getRemoteHost());
putAttribute(attributes, "http.remote.addr", request.getRemoteAddr());
putAttribute(attributes, "http.remote.user", request.getRemoteUser());
putAttribute(attributes, "http.request.uri", request.getRequestURI());
putAttribute(attributes, "http.auth.type", request.getAuthType());
try {
putAttribute(attributes, HTTP_CONTEXT_ID, contextIdentifier);
putAttribute(attributes, "mime.type", request.getContentType());
putAttribute(attributes, "http.servlet.path", request.getServletPath());
putAttribute(attributes, "http.context.path", request.getContextPath());
putAttribute(attributes, "http.method", request.getMethod());
if ( request.getQueryString() != null ) {
putAttribute(attributes, "http.query.string", URLDecoder.decode(request.getQueryString(), charset));
}
putAttribute(attributes, "http.remote.host", request.getRemoteHost());
putAttribute(attributes, "http.remote.addr", request.getRemoteAddr());
putAttribute(attributes, "http.remote.user", request.getRemoteUser());
putAttribute(attributes, "http.request.uri", request.getRequestURI());
putAttribute(attributes, "http.auth.type", request.getAuthType());
final String queryString = request.getQueryString();
if ( queryString != null ) {
final String[] params = URL_QUERY_PARAM_DELIMITER.split(queryString);
for ( final String keyValueString : params ) {
final int indexOf = keyValueString.indexOf("=");
if ( indexOf < 0 ) {
// no =, then it's just a key with no value
attributes.put("http.query.param." + URLDecoder.decode(keyValueString, charset), "");
} else {
final String key = keyValueString.substring(0, indexOf);
final String value;
if ( indexOf == keyValueString.length() - 1 ) {
value = "";
} else {
value = keyValueString.substring(indexOf + 1);
}
attributes.put("http.query.param." + URLDecoder.decode(key, charset), URLDecoder.decode(value, charset));
}
}
}
} catch (final UnsupportedEncodingException uee) {
throw new ProcessException("Invalid character encoding", uee); // won't happen because charset has been validated
}
final Enumeration<String> headerNames = request.getHeaderNames();
while ( headerNames.hasMoreElements() ) {

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.http.HttpContextMap;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
public class TestHandleHttpRequest {
@Test
public void testRequestAddedToService() throws InitializationException, MalformedURLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
runner.setProperty(HandleHttpRequest.PORT, "0");
final MockHttpContextMap contextMap = new MockHttpContextMap();
runner.addControllerService("http-context-map", contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
// trigger processor to stop but not shutdown.
runner.run(1, false);
try {
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
try {
final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:" + port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
connection.setDoOutput(false);
connection.setRequestMethod("GET");
connection.setRequestProperty("header1", "value1");
connection.setRequestProperty("header2", "");
connection.setRequestProperty("header3", "apple=orange");
connection.setConnectTimeout(3000);
connection.setReadTimeout(3000);
StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
} catch (final Throwable t) {
t.printStackTrace();
Assert.fail(t.toString());
}
}
});
httpThread.start();
try { Thread.sleep(100L); } catch (final InterruptedException ie) {}
// process the request.
runner.run(1, false);
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
assertEquals(1, contextMap.size());
final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
mff.assertAttributeEquals("http.query.param.query", "true");
mff.assertAttributeEquals("http.query.param.value1", "value1");
mff.assertAttributeEquals("http.query.param.value2", "");
mff.assertAttributeEquals("http.query.param.value3", "");
mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
mff.assertAttributeEquals("http.headers.header1", "value1");
mff.assertAttributeEquals("http.headers.header3", "apple=orange");
} finally {
// shut down the server
runner.run(1, true);
}
}
private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap {
private final ConcurrentMap<String, HttpServletResponse> responseMap = new ConcurrentHashMap<>();
@Override
public boolean register(String identifier, HttpServletRequest request, HttpServletResponse response, AsyncContext context) {
responseMap.put(identifier, response);
return true;
}
@Override
public HttpServletResponse getResponse(String identifier) {
return responseMap.get(identifier);
}
@Override
public void complete(String identifier) {
responseMap.remove(identifier);
}
public int size() {
return responseMap.size();
}
}
}

View File

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.http.HttpContextMap;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestHandleHttpResponse {
@Test
public void testEnsureCompleted() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
final MockHttpContextMap contextMap = new MockHttpContextMap("my-id");
runner.addControllerService("http-context-map", contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HandleHttpResponse.HTTP_CONTEXT_ID, "my-id");
attributes.put("my-attr", "hello");
attributes.put("status.code", "201");
runner.enqueue("hello".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1);
assertEquals("hello", contextMap.baos.toString());
assertEquals("hello", contextMap.headersSent.get("my-attr"));
assertNull(contextMap.headersSent.get("no-valid-attr"));
assertEquals(201, contextMap.statusCode);
assertEquals(1, contextMap.getCompletionCount());
assertTrue(contextMap.headersWithNoValue.isEmpty());
}
private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap {
private final String id;
private final AtomicInteger completedCount = new AtomicInteger(0);
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
private final ConcurrentMap<String, String> headersSent = new ConcurrentHashMap<>();
private volatile int statusCode = -1;
private final List<String> headersWithNoValue = new CopyOnWriteArrayList<>();
public MockHttpContextMap(final String expectedIdentifier) {
this.id = expectedIdentifier;
}
@Override
public boolean register(String identifier, HttpServletRequest request, HttpServletResponse response, AsyncContext context) {
return true;
}
@Override
public HttpServletResponse getResponse(final String identifier) {
if ( !id.equals(identifier) ) {
Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier);
}
try {
final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream() {
@Override
public boolean isReady() { return true; }
@Override
public void setWriteListener(WriteListener writeListener) {}
@Override
public void write(int b) throws IOException { baos.write(b); }
@Override
public void write(byte[] b) throws IOException { baos.write(b); }
@Override
public void write(byte[] b, int off, int len) throws IOException { baos.write(b, off, len); }
});
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
final String key = invocation.getArgumentAt(0, String.class);
final String value = invocation.getArgumentAt(1, String.class);
if ( value == null ) {
headersWithNoValue.add(key);
} else {
headersSent.put(key, value);
}
return null;
}
}).when(response).setHeader(Mockito.any(String.class), Mockito.any(String.class));
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
statusCode = invocation.getArgumentAt(0, int.class);
return null;
}
}).when(response).setStatus(Mockito.anyInt());
return response;
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
return null;
}
}
@Override
public void complete(final String identifier) {
if ( !id.equals(identifier) ) {
Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier);
}
completedCount.incrementAndGet();
}
public int getCompletionCount() {
return completedCount.get();
}
}
}