mirror of https://github.com/apache/nifi.git
NIFI-9277: Add Record Reader and Writer to ListenHTTP.
This closes #5446. Signed-off-by: Tamas Palfy <tamas.bertalan.palfy@gmail.com>
This commit is contained in:
parent
0147b15e37
commit
207894ebe0
|
@ -36,7 +36,6 @@ import org.apache.nifi.processor.DataUnit;
|
|||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessSessionFactory;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
@ -46,6 +45,8 @@ import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet;
|
|||
import org.apache.nifi.scheduling.ExecutionNode;
|
||||
import org.apache.nifi.security.util.ClientAuth;
|
||||
import org.apache.nifi.security.util.TlsConfiguration;
|
||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
|
||||
|
@ -88,13 +89,12 @@ import java.util.stream.Collectors;
|
|||
+ "supported. GET, PUT, and DELETE will result in an error and the HTTP response status code 405. "
|
||||
+ "GET is supported on <service_URI>/healthcheck. If the service is available, it returns \"200 OK\" with the content \"OK\". "
|
||||
+ "The health check functionality can be configured to be accessible via a different port. "
|
||||
+ "For details see the documentation of the \"Listening Port for health check requests\" property.")
|
||||
+ "For details see the documentation of the \"Listening Port for health check requests\" property."
|
||||
+ "A Record Reader and Record Writer property can be enabled on the processor to process incoming requests as records. "
|
||||
+ "Record processing is not allowed for multipart requests and request in FlowFileV3 format (minifi).")
|
||||
public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
||||
private static final String MATCH_ALL = ".*";
|
||||
|
||||
private Set<Relationship> relationships;
|
||||
private List<PropertyDescriptor> properties;
|
||||
|
||||
private final AtomicBoolean initialized = new AtomicBoolean(false);
|
||||
private final AtomicBoolean runOnPrimary = new AtomicBoolean(false);
|
||||
|
||||
|
@ -253,6 +253,46 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||
.defaultValue("200")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
|
||||
.name("record-reader")
|
||||
.displayName("Record Reader")
|
||||
.description("The Record Reader to use parsing the incoming FlowFile into Records")
|
||||
.required(false)
|
||||
.identifiesControllerService(RecordReaderFactory.class)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
|
||||
.name("record-writer")
|
||||
.displayName("Record Writer")
|
||||
.description("The Record Writer to use for serializing Records after they have been transformed")
|
||||
.required(true)
|
||||
.identifiesControllerService(RecordSetWriterFactory.class)
|
||||
.dependsOn(RECORD_READER)
|
||||
.build();
|
||||
|
||||
protected static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
|
||||
BASE_PATH,
|
||||
PORT,
|
||||
HEALTH_CHECK_PORT,
|
||||
MAX_DATA_RATE,
|
||||
SSL_CONTEXT_SERVICE,
|
||||
CLIENT_AUTHENTICATION,
|
||||
AUTHORIZED_DN_PATTERN,
|
||||
AUTHORIZED_ISSUER_DN_PATTERN,
|
||||
MAX_UNCONFIRMED_TIME,
|
||||
HEADERS_AS_ATTRIBUTES_REGEX,
|
||||
RETURN_CODE,
|
||||
MULTIPART_REQUEST_MAX_SIZE,
|
||||
MULTIPART_READ_BUFFER_SIZE,
|
||||
MAX_THREAD_POOL_SIZE,
|
||||
RECORD_READER,
|
||||
RECORD_WRITER
|
||||
));
|
||||
|
||||
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Collections.singletonList(
|
||||
RELATIONSHIP_SUCCESS
|
||||
)));
|
||||
|
||||
public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
|
||||
public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
|
||||
public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
|
||||
|
@ -274,12 +314,12 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||
private final AtomicReference<StreamThrottler> throttlerRef = new AtomicReference<>();
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
||||
List<ValidationResult> results = new ArrayList<>(1);
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
|
||||
|
||||
validatePortsAreNotEqual(context, results);
|
||||
validatePortsAreNotEqual(validationContext, validationResults);
|
||||
|
||||
return results;
|
||||
return validationResults;
|
||||
}
|
||||
|
||||
private void validatePortsAreNotEqual(ValidationContext context, Collection<ValidationResult> validationResults) {
|
||||
|
@ -297,38 +337,14 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
|
|||
return new ValidationResult.Builder().subject(subject).valid(false).explanation(explanation).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(RELATIONSHIP_SUCCESS);
|
||||
this.relationships = Collections.unmodifiableSet(relationships);
|
||||
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(BASE_PATH);
|
||||
descriptors.add(PORT);
|
||||
descriptors.add(HEALTH_CHECK_PORT);
|
||||
descriptors.add(MAX_DATA_RATE);
|
||||
descriptors.add(SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(CLIENT_AUTHENTICATION);
|
||||
descriptors.add(AUTHORIZED_DN_PATTERN);
|
||||
descriptors.add(AUTHORIZED_ISSUER_DN_PATTERN);
|
||||
descriptors.add(MAX_UNCONFIRMED_TIME);
|
||||
descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
|
||||
descriptors.add(RETURN_CODE);
|
||||
descriptors.add(MULTIPART_REQUEST_MAX_SIZE);
|
||||
descriptors.add(MULTIPART_READ_BUFFER_SIZE);
|
||||
descriptors.add(MAX_THREAD_POOL_SIZE);
|
||||
this.properties = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return relationships;
|
||||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
return PROPERTIES;
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.exception;
|
||||
|
||||
public class ListenHttpException extends RuntimeException {
|
||||
|
||||
private final int returnCode;
|
||||
|
||||
public ListenHttpException(final String message, final Throwable cause, final int returnCode) {
|
||||
super(message, cause);
|
||||
this.returnCode = returnCode;
|
||||
}
|
||||
|
||||
public int getReturnCode() {
|
||||
return returnCode;
|
||||
}
|
||||
}
|
|
@ -27,9 +27,16 @@ import org.apache.nifi.logging.ComponentLog;
|
|||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessSessionFactory;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processors.standard.ListenHTTP;
|
||||
import org.apache.nifi.processors.standard.ListenHTTP.FlowFileEntryTimeWrapper;
|
||||
import org.apache.nifi.processors.standard.exception.ListenHttpException;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
import org.apache.nifi.stream.io.StreamThrottler;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.apache.nifi.util.FlowFileUnpackager;
|
||||
|
@ -49,11 +56,12 @@ import javax.servlet.http.HttpServletResponse;
|
|||
import javax.servlet.http.Part;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
|
@ -110,6 +118,8 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
private long multipartRequestMaxSize;
|
||||
private int multipartReadBufferSize;
|
||||
private int port;
|
||||
private RecordReaderFactory readerFactory;
|
||||
private RecordSetWriterFactory writerFactory;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
|
@ -128,6 +138,8 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
this.multipartRequestMaxSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE);
|
||||
this.multipartReadBufferSize = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE);
|
||||
this.port = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PORT);
|
||||
this.readerFactory = processContext.getProperty(ListenHTTP.RECORD_READER).asControllerService(RecordReaderFactory.class);
|
||||
this.writerFactory = processContext.getProperty(ListenHTTP.RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -244,11 +256,19 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
final ProcessSession session, final String foundSubject, final String foundIssuer, final Throwable t) throws IOException {
|
||||
session.rollback();
|
||||
logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] IssuerDN [{}] due to {}", request.getRemoteHost(), foundSubject, foundIssuer, t);
|
||||
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
|
||||
if (t instanceof ListenHttpException) {
|
||||
final int returnCode = ((ListenHttpException) t).getReturnCode();
|
||||
response.sendError(returnCode, t.toString());
|
||||
} else {
|
||||
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
|
||||
}
|
||||
}
|
||||
|
||||
private Set<FlowFile> handleMultipartRequest(HttpServletRequest request, ProcessSession session, String foundSubject, String foundIssuer)
|
||||
throws IOException, IllegalStateException, ServletException {
|
||||
if (isRecordProcessing()) {
|
||||
logger.debug("Record processing will not be utilized while processing multipart request. Request URI: {}", request.getRequestURI());
|
||||
}
|
||||
Set<FlowFile> flowFileSet = new HashSet<>();
|
||||
String tempDir = System.getProperty("java.io.tmpdir");
|
||||
request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, multipartRequestMaxSize, multipartRequestMaxSize, multipartReadBufferSize));
|
||||
|
@ -281,21 +301,12 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
return session.putAllAttributes(flowFile, attributes);
|
||||
}
|
||||
|
||||
private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session,
|
||||
String foundSubject, String foundIssuer, final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) {
|
||||
FlowFile flowFile = null;
|
||||
private Set<FlowFile> handleRequest(final HttpServletRequest request, final ProcessSession session, String foundSubject, String foundIssuer,
|
||||
final boolean destinationIsLegacyNiFi, final String contentType, final InputStream in) throws IOException {
|
||||
FlowFile flowFile;
|
||||
String holdUuid = null;
|
||||
final AtomicBoolean hasMoreData = new AtomicBoolean(false);
|
||||
final FlowFileUnpackager unpackager;
|
||||
if (StandardFlowFileMediaType.VERSION_3.getMediaType().equals(contentType)) {
|
||||
unpackager = new FlowFileUnpackagerV3();
|
||||
} else if (StandardFlowFileMediaType.VERSION_2.getMediaType().equals(contentType)) {
|
||||
unpackager = new FlowFileUnpackagerV2();
|
||||
} else if (StringUtils.startsWith(contentType, StandardFlowFileMediaType.VERSION_UNSPECIFIED.getMediaType())) {
|
||||
unpackager = new FlowFileUnpackagerV1();
|
||||
} else {
|
||||
unpackager = null;
|
||||
}
|
||||
final FlowFileUnpackager unpackager = getFlowFileUnpackager(contentType);
|
||||
|
||||
final Set<FlowFile> flowFileSet = new HashSet<>();
|
||||
|
||||
|
@ -303,32 +314,38 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
final long startNanos = System.nanoTime();
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
flowFile = session.create();
|
||||
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(final OutputStream rawOut) throws IOException {
|
||||
try (final BufferedOutputStream bos = new BufferedOutputStream(rawOut, 65536)) {
|
||||
if (unpackager == null) {
|
||||
IOUtils.copy(in, bos);
|
||||
hasMoreData.set(false);
|
||||
} else {
|
||||
attributes.putAll(unpackager.unpackageFlowFile(in, bos));
|
||||
|
||||
if (destinationIsLegacyNiFi) {
|
||||
if (attributes.containsKey("nf.file.name")) {
|
||||
// for backward compatibility with old nifi...
|
||||
attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
|
||||
}
|
||||
final OutputStream out = session.write(flowFile);
|
||||
|
||||
if (attributes.containsKey("nf.file.path")) {
|
||||
attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
|
||||
}
|
||||
}
|
||||
try (final BufferedOutputStream bos = new BufferedOutputStream(out, 65536)) {
|
||||
if (unpackager == null) {
|
||||
if (isRecordProcessing()) {
|
||||
processRecord(in, flowFile, out);
|
||||
} else {
|
||||
IOUtils.copy(in, bos);
|
||||
hasMoreData.set(false);
|
||||
}
|
||||
} else {
|
||||
if (isRecordProcessing()) {
|
||||
logger.debug("Record processing will not be utilized while processing with unpackager. Request URI: {}", request.getRequestURI());
|
||||
}
|
||||
attributes.putAll(unpackager.unpackageFlowFile(in, bos));
|
||||
|
||||
hasMoreData.set(unpackager.hasMoreData());
|
||||
if (destinationIsLegacyNiFi) {
|
||||
if (attributes.containsKey("nf.file.name")) {
|
||||
// for backward compatibility with old nifi...
|
||||
attributes.put(CoreAttributes.FILENAME.key(), attributes.remove("nf.file.name"));
|
||||
}
|
||||
|
||||
if (attributes.containsKey("nf.file.path")) {
|
||||
attributes.put(CoreAttributes.PATH.key(), attributes.remove("nf.file.path"));
|
||||
}
|
||||
}
|
||||
|
||||
hasMoreData.set(unpackager.hasMoreData());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
final long transferNanos = System.nanoTime() - startNanos;
|
||||
final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
|
||||
|
@ -361,33 +378,9 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
return flowFileSet;
|
||||
}
|
||||
|
||||
protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session,
|
||||
final String foundSubject, final String foundIssuer, FlowFile flowFile) {
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
addMatchingRequestHeaders(request, attributes);
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
|
||||
flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
|
||||
flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
|
||||
flowFile = session.putAttribute(flowFile, "restlistener.remote.issuer.dn", foundIssuer);
|
||||
return flowFile;
|
||||
}
|
||||
|
||||
private void addMatchingRequestHeaders(final HttpServletRequest request, final Map<String, String> attributes) {
|
||||
// put arbitrary headers on flow file
|
||||
for (Enumeration<String> headerEnum = request.getHeaderNames();
|
||||
headerEnum.hasMoreElements(); ) {
|
||||
String headerName = headerEnum.nextElement();
|
||||
if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
|
||||
String headerValue = request.getHeader(headerName);
|
||||
attributes.put(headerName, headerValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void proceedFlow(final HttpServletRequest request, final HttpServletResponse response,
|
||||
final ProcessSession session, final String foundSubject, final String foundIssuer, final boolean createHold,
|
||||
final Set<FlowFile> flowFileSet) throws IOException, UnsupportedEncodingException {
|
||||
final Set<FlowFile> flowFileSet) throws IOException {
|
||||
if (createHold) {
|
||||
String uuid = UUID.randomUUID().toString();
|
||||
|
||||
|
@ -408,7 +401,7 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
final String ackUri = "/" + basePath + "/holds/" + uuid;
|
||||
response.addHeader(LOCATION_HEADER_NAME, ackUri);
|
||||
response.addHeader(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE);
|
||||
response.getOutputStream().write(ackUri.getBytes("UTF-8"));
|
||||
response.getOutputStream().write(ackUri.getBytes(StandardCharsets.UTF_8));
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]; placed hold on these {} files with ID {}",
|
||||
flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer, flowFileSet.size(), uuid);
|
||||
|
@ -433,6 +426,59 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
}
|
||||
}
|
||||
|
||||
protected FlowFile saveRequestDetailsAsAttributes(final HttpServletRequest request, final ProcessSession session,
|
||||
final String foundSubject, final String foundIssuer, FlowFile flowFile) {
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
addMatchingRequestHeaders(request, attributes);
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
flowFile = session.putAttribute(flowFile, "restlistener.remote.source.host", request.getRemoteHost());
|
||||
flowFile = session.putAttribute(flowFile, "restlistener.request.uri", request.getRequestURI());
|
||||
flowFile = session.putAttribute(flowFile, "restlistener.remote.user.dn", foundSubject);
|
||||
flowFile = session.putAttribute(flowFile, "restlistener.remote.issuer.dn", foundIssuer);
|
||||
return flowFile;
|
||||
}
|
||||
|
||||
private void processRecord(InputStream in, FlowFile flowFile, OutputStream out) {
|
||||
try (final RecordReader reader = readerFactory.createRecordReader(flowFile, new BufferedInputStream(in), logger)) {
|
||||
final RecordSet recordSet = reader.createRecordSet();
|
||||
try (final RecordSetWriter writer = writerFactory.createWriter(logger, reader.getSchema(), out, flowFile)) {
|
||||
writer.write(recordSet);
|
||||
}
|
||||
} catch (IOException | MalformedRecordException e) {
|
||||
throw new ListenHttpException("Could not process record.", e, HttpServletResponse.SC_BAD_REQUEST);
|
||||
} catch (SchemaNotFoundException e) {
|
||||
throw new ListenHttpException("Could not find schema.", e, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
private FlowFileUnpackager getFlowFileUnpackager(String contentType) {
|
||||
final FlowFileUnpackager unpackager;
|
||||
if (StandardFlowFileMediaType.VERSION_3.getMediaType().equals(contentType)) {
|
||||
unpackager = new FlowFileUnpackagerV3();
|
||||
} else if (StandardFlowFileMediaType.VERSION_2.getMediaType().equals(contentType)) {
|
||||
unpackager = new FlowFileUnpackagerV2();
|
||||
} else if (StringUtils.startsWith(contentType, StandardFlowFileMediaType.VERSION_UNSPECIFIED.getMediaType())) {
|
||||
unpackager = new FlowFileUnpackagerV1();
|
||||
} else {
|
||||
unpackager = null;
|
||||
}
|
||||
return unpackager;
|
||||
}
|
||||
|
||||
private void addMatchingRequestHeaders(final HttpServletRequest request, final Map<String, String> attributes) {
|
||||
// put arbitrary headers on flow file
|
||||
for (Enumeration<String> headerEnum = request.getHeaderNames();
|
||||
headerEnum.hasMoreElements(); ) {
|
||||
String headerName = headerEnum.nextElement();
|
||||
if (headerPattern != null && headerPattern.matcher(headerName).matches()) {
|
||||
String headerValue = request.getHeader(headerName);
|
||||
attributes.put(headerName, headerValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private void putAttribute(final Map<String, String> map, final String key, final Object value) {
|
||||
if (value == null) {
|
||||
return;
|
||||
|
@ -448,4 +494,8 @@ public class ListenHTTPServlet extends HttpServlet {
|
|||
|
||||
map.put(key, value);
|
||||
}
|
||||
|
||||
private boolean isRecordProcessing() {
|
||||
return readerFactory != null && writerFactory != null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.security.GeneralSecurityException;
|
|||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
|
@ -52,6 +53,9 @@ import org.apache.nifi.security.util.SslContextFactory;
|
|||
import org.apache.nifi.security.util.StandardTlsConfiguration;
|
||||
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
|
||||
import org.apache.nifi.security.util.TlsConfiguration;
|
||||
import org.apache.nifi.serialization.record.MockRecordParser;
|
||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
|
@ -461,6 +465,71 @@ public class TestListenHTTP {
|
|||
assertEquals(maxThreadPoolSize, sizedThreadPool.getMaxThreads());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPOSTRequestsReceivedWithRecordReader() throws Exception {
|
||||
final MockRecordParser parser = setupRecordReaderTest();
|
||||
|
||||
parser.addSchemaField("id", RecordFieldType.INT);
|
||||
parser.addSchemaField("name", RecordFieldType.STRING);
|
||||
parser.addSchemaField("code", RecordFieldType.LONG);
|
||||
|
||||
final List<Integer> keys = Arrays.asList(1, 2, 3, 4);
|
||||
final List<String> names = Arrays.asList("rec1", "rec2", "rec3", "rec4");
|
||||
final List<Long> codes = Arrays.asList(101L, 102L, 103L, 104L);
|
||||
|
||||
for (int i = 0; i < keys.size(); i++) {
|
||||
parser.addRecord(keys.get(i), names.get(i), codes.get(i));
|
||||
}
|
||||
|
||||
final String expectedMessage =
|
||||
"\"1\",\"rec1\",\"101\"\n" +
|
||||
"\"2\",\"rec2\",\"102\"\n" +
|
||||
"\"3\",\"rec3\",\"103\"\n" +
|
||||
"\"4\",\"rec4\",\"104\"\n";
|
||||
|
||||
startWebServerAndSendMessages(Collections.singletonList(""), HttpServletResponse.SC_OK, false, false);
|
||||
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS);
|
||||
|
||||
runner.assertTransferCount(RELATIONSHIP_SUCCESS, 1);
|
||||
mockFlowFiles.get(0).assertContentEquals(expectedMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReturn400WhenInvalidPOSTRequestSentWithRecordReader() throws Exception {
|
||||
final MockRecordParser parser = setupRecordReaderTest();
|
||||
parser.failAfter(2);
|
||||
|
||||
parser.addSchemaField("id", RecordFieldType.INT);
|
||||
parser.addSchemaField("name", RecordFieldType.STRING);
|
||||
parser.addSchemaField("code", RecordFieldType.LONG);
|
||||
|
||||
final List<Integer> keys = Arrays.asList(1, 2, 3, 4);
|
||||
final List<String> names = Arrays.asList("rec1", "rec2", "rec3", "rec4");
|
||||
final List<Long> codes = Arrays.asList(101L, 102L, 103L, 104L);
|
||||
|
||||
for (int i = 0; i < keys.size(); i++) {
|
||||
parser.addRecord(keys.get(i), names.get(i), codes.get(i));
|
||||
}
|
||||
|
||||
startWebServerAndSendMessages(Collections.singletonList(""), HttpServletResponse.SC_BAD_REQUEST, false, false);
|
||||
|
||||
runner.assertTransferCount(RELATIONSHIP_SUCCESS, 0);
|
||||
}
|
||||
|
||||
private MockRecordParser setupRecordReaderTest() throws InitializationException {
|
||||
final MockRecordParser parser = new MockRecordParser();
|
||||
final MockRecordWriter writer = new MockRecordWriter();
|
||||
|
||||
runner.addControllerService("mockRecordParser", parser);
|
||||
runner.setProperty(ListenHTTP.RECORD_READER, "mockRecordParser");
|
||||
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
|
||||
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
|
||||
runner.addControllerService("mockRecordWriter", writer);
|
||||
runner.setProperty(ListenHTTP.RECORD_WRITER, "mockRecordWriter");
|
||||
|
||||
return parser;
|
||||
}
|
||||
|
||||
private void startSecureServer() {
|
||||
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
|
||||
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
|
||||
|
|
Loading…
Reference in New Issue