mirror of https://github.com/apache/nifi.git
Merge branch 'develop' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
This commit is contained in:
commit
13cd2a67c8
|
@ -25,9 +25,6 @@
|
||||||
|
|
||||||
<artifactId>nifi-hl7-query-language</artifactId>
|
<artifactId>nifi-hl7-query-language</artifactId>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<name>NiFi Health Level 7 (HL7) Query Language</name>
|
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
<plugin>
|
<plugin>
|
||||||
|
|
|
@ -48,18 +48,22 @@ public class DocsReader {
|
||||||
return Collections.emptySet();
|
return Collections.emptySet();
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<Document> docs = new ArrayList<>();
|
final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
|
||||||
|
final List<Document> docs = new ArrayList<>(numDocs);
|
||||||
|
|
||||||
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
|
for (final ScoreDoc scoreDoc : topDocs.scoreDocs) {
|
||||||
final int docId = scoreDoc.doc;
|
final int docId = scoreDoc.doc;
|
||||||
final Document d = indexReader.document(docId);
|
final Document d = indexReader.document(docId);
|
||||||
docs.add(d);
|
docs.add(d);
|
||||||
|
if ( retrievalCount.incrementAndGet() >= maxResults ) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return read(docs, allProvenanceLogFiles, retrievalCount, maxResults);
|
return read(docs, allProvenanceLogFiles);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
|
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException {
|
||||||
LuceneUtil.sortDocsForRetrieval(docs);
|
LuceneUtil.sortDocsForRetrieval(docs);
|
||||||
|
|
||||||
RecordReader reader = null;
|
RecordReader reader = null;
|
||||||
|
@ -79,9 +83,6 @@ public class DocsReader {
|
||||||
reader.skipTo(byteOffset);
|
reader.skipTo(byteOffset);
|
||||||
final StandardProvenanceEventRecord record = reader.nextRecord();
|
final StandardProvenanceEventRecord record = reader.nextRecord();
|
||||||
matchingRecords.add(record);
|
matchingRecords.add(record);
|
||||||
if (retrievalCount.incrementAndGet() >= maxResults) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
|
throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
|
||||||
}
|
}
|
||||||
|
@ -91,7 +92,7 @@ public class DocsReader {
|
||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
|
List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
|
||||||
if (potentialFiles.isEmpty()) {
|
if (potentialFiles.isEmpty()) {
|
||||||
throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
|
throw new FileNotFoundException("Could not find Provenance Log File with basename " + storageFilename + " in the Provenance Repository");
|
||||||
}
|
}
|
||||||
|
@ -108,9 +109,6 @@ public class DocsReader {
|
||||||
|
|
||||||
final StandardProvenanceEventRecord record = reader.nextRecord();
|
final StandardProvenanceEventRecord record = reader.nextRecord();
|
||||||
matchingRecords.add(record);
|
matchingRecords.add(record);
|
||||||
if (retrievalCount.incrementAndGet() >= maxResults) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (final IOException e) {
|
} catch (final IOException e) {
|
||||||
throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e);
|
throw new IOException("Failed to retrieve record from Provenance File " + file + " due to " + e, e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,8 @@ import org.apache.nifi.provenance.lucene.LuceneUtil;
|
||||||
public class RecordReaders {
|
public class RecordReaders {
|
||||||
|
|
||||||
public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException {
|
public static RecordReader newRecordReader(File file, final Collection<Path> provenanceLogFiles) throws IOException {
|
||||||
|
final File originalFile = file;
|
||||||
|
|
||||||
if (!file.exists()) {
|
if (!file.exists()) {
|
||||||
if (provenanceLogFiles == null) {
|
if (provenanceLogFiles == null) {
|
||||||
throw new FileNotFoundException(file.toString());
|
throw new FileNotFoundException(file.toString());
|
||||||
|
@ -47,11 +49,44 @@ public class RecordReaders {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (file == null || !file.exists()) {
|
InputStream fis = null;
|
||||||
throw new FileNotFoundException(file.toString());
|
if ( file.exists() ) {
|
||||||
|
try {
|
||||||
|
fis = new FileInputStream(file);
|
||||||
|
} catch (final FileNotFoundException fnfe) {
|
||||||
|
fis = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
openStream: while ( fis == null ) {
|
||||||
|
final File dir = file.getParentFile();
|
||||||
|
final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
|
||||||
|
|
||||||
|
// depending on which rollover actions have occurred, we could have 3 possibilities for the
|
||||||
|
// filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
|
||||||
|
// because most often we are compressing on rollover and most often we have already finished
|
||||||
|
// compressing by the time that we are querying the data.
|
||||||
|
for ( final String extension : new String[] {".indexed.prov.gz", ".indexed.prov", ".prov"} ) {
|
||||||
|
file = new File(dir, baseName + extension);
|
||||||
|
if ( file.exists() ) {
|
||||||
|
try {
|
||||||
|
fis = new FileInputStream(file);
|
||||||
|
break openStream;
|
||||||
|
} catch (final FileNotFoundException fnfe) {
|
||||||
|
// file was modified by a RolloverAction after we verified that it exists but before we could
|
||||||
|
// create an InputStream for it. Start over.
|
||||||
|
fis = null;
|
||||||
|
continue openStream;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
final InputStream fis = new FileInputStream(file);
|
if ( fis == null ) {
|
||||||
|
throw new FileNotFoundException("Unable to locate file " + originalFile);
|
||||||
|
}
|
||||||
final InputStream readableStream;
|
final InputStream readableStream;
|
||||||
if (file.getName().endsWith(".gz")) {
|
if (file.getName().endsWith(".gz")) {
|
||||||
readableStream = new BufferedInputStream(new GZIPInputStream(fis));
|
readableStream = new BufferedInputStream(new GZIPInputStream(fis));
|
||||||
|
|
|
@ -22,6 +22,7 @@ import com.jayway.jsonpath.JsonPath;
|
||||||
import com.jayway.jsonpath.internal.spi.json.JsonSmartJsonProvider;
|
import com.jayway.jsonpath.internal.spi.json.JsonSmartJsonProvider;
|
||||||
import com.jayway.jsonpath.spi.json.JsonProvider;
|
import com.jayway.jsonpath.spi.json.JsonProvider;
|
||||||
import net.minidev.json.parser.JSONParser;
|
import net.minidev.json.parser.JSONParser;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.ValidationContext;
|
import org.apache.nifi.components.ValidationContext;
|
||||||
import org.apache.nifi.components.ValidationResult;
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.components.Validator;
|
import org.apache.nifi.components.Validator;
|
||||||
|
@ -35,8 +36,10 @@ import org.apache.nifi.util.ObjectHolder;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides common functionality used for processors interacting and manipulating JSON data via JsonPath.
|
* Provides common functionality used for processors interacting and manipulating JSON data via JsonPath.
|
||||||
|
@ -51,6 +54,24 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
private static final JsonProvider JSON_PROVIDER = STRICT_PROVIDER_CONFIGURATION.jsonProvider();
|
private static final JsonProvider JSON_PROVIDER = STRICT_PROVIDER_CONFIGURATION.jsonProvider();
|
||||||
|
|
||||||
|
static final Map<String, String> NULL_REPRESENTATION_MAP = new HashMap<>();
|
||||||
|
|
||||||
|
static final String EMPTY_STRING_OPTION = "empty string";
|
||||||
|
static final String NULL_STRING_OPTION = "the string 'null'";
|
||||||
|
|
||||||
|
static {
|
||||||
|
NULL_REPRESENTATION_MAP.put(EMPTY_STRING_OPTION, "");
|
||||||
|
NULL_REPRESENTATION_MAP.put(NULL_STRING_OPTION, "null");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final PropertyDescriptor NULL_VALUE_DEFAULT_REPRESENTATION = new PropertyDescriptor.Builder()
|
||||||
|
.name("Null Value Representation")
|
||||||
|
.description("Indicates the desired representation of JSON Path expressions resulting in a null value.")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues(NULL_REPRESENTATION_MAP.keySet())
|
||||||
|
.defaultValue(EMPTY_STRING_OPTION)
|
||||||
|
.build();
|
||||||
|
|
||||||
static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) {
|
static DocumentContext validateAndEstablishJsonContext(ProcessSession processSession, FlowFile flowFile) {
|
||||||
// Parse the document once into an associated context to support multiple path evaluations if specified
|
// Parse the document once into an associated context to support multiple path evaluations if specified
|
||||||
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
|
final ObjectHolder<DocumentContext> contextHolder = new ObjectHolder<>(null);
|
||||||
|
@ -79,9 +100,9 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
|
||||||
return !(obj instanceof Map || obj instanceof List);
|
return !(obj instanceof Map || obj instanceof List);
|
||||||
}
|
}
|
||||||
|
|
||||||
static String getResultRepresentation(Object jsonPathResult) {
|
static String getResultRepresentation(Object jsonPathResult, String defaultValue) {
|
||||||
if (isJsonScalar(jsonPathResult)) {
|
if (isJsonScalar(jsonPathResult)) {
|
||||||
return jsonPathResult.toString();
|
return Objects.toString(jsonPathResult, defaultValue);
|
||||||
}
|
}
|
||||||
return JSON_PROVIDER.toJson(jsonPathResult);
|
return JSON_PROVIDER.toJson(jsonPathResult);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,20 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import java.io.IOException;
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
import java.io.OutputStream;
|
import com.jayway.jsonpath.InvalidJsonException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import com.jayway.jsonpath.JsonPath;
|
||||||
import java.util.ArrayList;
|
import com.jayway.jsonpath.PathNotFoundException;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
|
@ -52,10 +42,12 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
import org.apache.nifi.stream.io.BufferedOutputStream;
|
import org.apache.nifi.stream.io.BufferedOutputStream;
|
||||||
import org.apache.nifi.util.ObjectHolder;
|
import org.apache.nifi.util.ObjectHolder;
|
||||||
|
|
||||||
import com.jayway.jsonpath.DocumentContext;
|
import java.io.IOException;
|
||||||
import com.jayway.jsonpath.InvalidJsonException;
|
import java.io.OutputStream;
|
||||||
import com.jayway.jsonpath.JsonPath;
|
import java.nio.charset.StandardCharsets;
|
||||||
import com.jayway.jsonpath.PathNotFoundException;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
@EventDriven
|
@EventDriven
|
||||||
@SideEffectFree
|
@SideEffectFree
|
||||||
|
@ -72,7 +64,7 @@ import com.jayway.jsonpath.PathNotFoundException;
|
||||||
+ "If Destination is 'flowfile-content' and the JsonPath does not evaluate to a defined path, the FlowFile will be routed to 'unmatched' without having its contents modified. "
|
+ "If Destination is 'flowfile-content' and the JsonPath does not evaluate to a defined path, the FlowFile will be routed to 'unmatched' without having its contents modified. "
|
||||||
+ "If Destination is flowfile-attribute and the expression matches nothing, attributes will be created with "
|
+ "If Destination is flowfile-attribute and the expression matches nothing, attributes will be created with "
|
||||||
+ "empty strings as the value, and the FlowFile will always be routed to 'matched.'")
|
+ "empty strings as the value, and the FlowFile will always be routed to 'matched.'")
|
||||||
@DynamicProperty(name="A FlowFile attribute(if <Destination> is set to 'flowfile-attribute')", value="A JsonPath expression", description="If <Destination>='flowfile-attribute' then that FlowFile attribute " +
|
@DynamicProperty(name = "A FlowFile attribute(if <Destination> is set to 'flowfile-attribute')", value = "A JsonPath expression", description = "If <Destination>='flowfile-attribute' then that FlowFile attribute " +
|
||||||
"will be set to any JSON objects that match the JsonPath. If <Destination>='flowfile-content' then the FlowFile content will be updated to any JSON objects that match the JsonPath.")
|
"will be set to any JSON objects that match the JsonPath. If <Destination>='flowfile-content' then the FlowFile content will be updated to any JSON objects that match the JsonPath.")
|
||||||
public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
||||||
|
|
||||||
|
@ -119,6 +111,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
||||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
properties.add(DESTINATION);
|
properties.add(DESTINATION);
|
||||||
properties.add(RETURN_TYPE);
|
properties.add(RETURN_TYPE);
|
||||||
|
properties.add(NULL_VALUE_DEFAULT_REPRESENTATION);
|
||||||
this.properties = Collections.unmodifiableList(properties);
|
this.properties = Collections.unmodifiableList(properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -211,6 +204,9 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
||||||
|
|
||||||
final ProcessorLog logger = getLogger();
|
final ProcessorLog logger = getLogger();
|
||||||
|
|
||||||
|
String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue();
|
||||||
|
final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption);
|
||||||
|
|
||||||
/* Build the JsonPath expressions from attributes */
|
/* Build the JsonPath expressions from attributes */
|
||||||
final Map<String, JsonPath> attributeToJsonPathMap = new HashMap<>();
|
final Map<String, JsonPath> attributeToJsonPathMap = new HashMap<>();
|
||||||
|
|
||||||
|
@ -265,7 +261,7 @@ public class EvaluateJsonPath extends AbstractJsonPathProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final String resultRepresentation = getResultRepresentation(resultHolder.get());
|
final String resultRepresentation = getResultRepresentation(resultHolder.get(), nullDefaultValue);
|
||||||
switch (destination) {
|
switch (destination) {
|
||||||
case DESTINATION_ATTRIBUTE:
|
case DESTINATION_ATTRIBUTE:
|
||||||
jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
|
jsonPathResults.put(jsonPathAttrKey, resultRepresentation);
|
||||||
|
|
|
@ -132,6 +132,14 @@ public class PutEmail extends AbstractProcessor {
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.defaultValue("NiFi")
|
.defaultValue("NiFi")
|
||||||
.build();
|
.build();
|
||||||
|
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Content Type")
|
||||||
|
.description("Mime Type used to interpret the contents of the email, such as text/plain or text/html")
|
||||||
|
.required(true)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.defaultValue("text/plain")
|
||||||
|
.build();
|
||||||
public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder()
|
||||||
.name("From")
|
.name("From")
|
||||||
.description("Specifies the Email address to use as the sender")
|
.description("Specifies the Email address to use as the sender")
|
||||||
|
@ -223,6 +231,7 @@ public class PutEmail extends AbstractProcessor {
|
||||||
properties.add(SMTP_TLS);
|
properties.add(SMTP_TLS);
|
||||||
properties.add(SMTP_SOCKET_FACTORY);
|
properties.add(SMTP_SOCKET_FACTORY);
|
||||||
properties.add(HEADER_XMAILER);
|
properties.add(HEADER_XMAILER);
|
||||||
|
properties.add(CONTENT_TYPE);
|
||||||
properties.add(FROM);
|
properties.add(FROM);
|
||||||
properties.add(TO);
|
properties.add(TO);
|
||||||
properties.add(CC);
|
properties.add(CC);
|
||||||
|
@ -297,10 +306,11 @@ public class PutEmail extends AbstractProcessor {
|
||||||
if (context.getProperty(INCLUDE_ALL_ATTRIBUTES).asBoolean()) {
|
if (context.getProperty(INCLUDE_ALL_ATTRIBUTES).asBoolean()) {
|
||||||
messageText = formatAttributes(flowFile, messageText);
|
messageText = formatAttributes(flowFile, messageText);
|
||||||
}
|
}
|
||||||
|
|
||||||
message.setText(messageText);
|
String contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
message.setContent(messageText, contentType);
|
||||||
message.setSentDate(new Date());
|
message.setSentDate(new Date());
|
||||||
|
|
||||||
if (context.getProperty(ATTACH_FILE).asBoolean()) {
|
if (context.getProperty(ATTACH_FILE).asBoolean()) {
|
||||||
final MimeBodyPart mimeText = new PreencodedMimeBodyPart("base64");
|
final MimeBodyPart mimeText = new PreencodedMimeBodyPart("base64");
|
||||||
mimeText.setDataHandler(new DataHandler(new ByteArrayDataSource(Base64.encodeBase64(messageText.getBytes("UTF-8")), "text/plain; charset=\"utf-8\"")));
|
mimeText.setDataHandler(new DataHandler(new ByteArrayDataSource(Base64.encodeBase64(messageText.getBytes("UTF-8")), "text/plain; charset=\"utf-8\"")));
|
||||||
|
|
|
@ -74,6 +74,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
||||||
protected void init(final ProcessorInitializationContext context) {
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
properties.add(ARRAY_JSON_PATH_EXPRESSION);
|
properties.add(ARRAY_JSON_PATH_EXPRESSION);
|
||||||
|
properties.add(NULL_VALUE_DEFAULT_REPRESENTATION);
|
||||||
this.properties = Collections.unmodifiableList(properties);
|
this.properties = Collections.unmodifiableList(properties);
|
||||||
|
|
||||||
final Set<Relationship> relationships = new HashSet<>();
|
final Set<Relationship> relationships = new HashSet<>();
|
||||||
|
@ -142,6 +143,8 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
final JsonPath jsonPath = JSON_PATH_REF.get();
|
final JsonPath jsonPath = JSON_PATH_REF.get();
|
||||||
|
String representationOption = processContext.getProperty(NULL_VALUE_DEFAULT_REPRESENTATION).getValue();
|
||||||
|
final String nullDefaultValue = NULL_REPRESENTATION_MAP.get(representationOption);
|
||||||
|
|
||||||
final List<FlowFile> segments = new ArrayList<>();
|
final List<FlowFile> segments = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -168,7 +171,7 @@ public class SplitJson extends AbstractJsonPathProcessor {
|
||||||
split = processSession.write(split, new OutputStreamCallback() {
|
split = processSession.write(split, new OutputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void process(OutputStream out) throws IOException {
|
public void process(OutputStream out) throws IOException {
|
||||||
String resultSegmentContent = getResultRepresentation(resultSegment);
|
String resultSegmentContent = getResultRepresentation(resultSegment, nullDefaultValue);
|
||||||
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
|
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -16,7 +16,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
|
import org.apache.nifi.stream.io.BufferedOutputStream;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.StringUtils;
|
import org.apache.nifi.util.StringUtils;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
@ -24,9 +28,14 @@ import org.apache.nifi.util.TestRunners;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
public class TestEvaluateJsonPath {
|
public class TestEvaluateJsonPath {
|
||||||
|
|
||||||
private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json");
|
private static final Path JSON_SNIPPET = Paths.get("src/test/resources/TestJson/json-sample.json");
|
||||||
|
@ -261,4 +270,81 @@ public class TestEvaluateJsonPath {
|
||||||
testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals(JSON_SNIPPET);
|
testRunner.getFlowFilesForRelationship(expectedRel).get(0).assertContentEquals(JSON_SNIPPET);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNullInput() throws Exception {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||||
|
testRunner.setProperty(EvaluateJsonPath.RETURN_TYPE, EvaluateJsonPath.RETURN_TYPE_JSON);
|
||||||
|
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
|
||||||
|
testRunner.setProperty("stringField", "$.stringField");
|
||||||
|
testRunner.setProperty("missingField", "$.missingField");
|
||||||
|
testRunner.setProperty("nullField", "$.nullField");
|
||||||
|
|
||||||
|
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
|
||||||
|
FlowFile ff = session.create();
|
||||||
|
|
||||||
|
ff = session.write(ff, new OutputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(OutputStream out) throws IOException {
|
||||||
|
try (OutputStream outputStream = new BufferedOutputStream(out)) {
|
||||||
|
outputStream.write("{\"stringField\": \"String Value\", \"nullField\": null}".getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
testRunner.enqueue(ff);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(EvaluateJsonPath.REL_MATCH, 1);
|
||||||
|
|
||||||
|
FlowFile output = testRunner.getFlowFilesForRelationship(EvaluateJsonPath.REL_MATCH).get(0);
|
||||||
|
|
||||||
|
String validFieldValue = output.getAttribute("stringField");
|
||||||
|
assertEquals("String Value", validFieldValue);
|
||||||
|
|
||||||
|
String missingValue = output.getAttribute("missingField");
|
||||||
|
assertEquals("Missing Value", "", missingValue);
|
||||||
|
|
||||||
|
String nullValue = output.getAttribute("nullField");
|
||||||
|
assertEquals("Null Value", "", nullValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNullInput_nullStringRepresentation() throws Exception {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateJsonPath());
|
||||||
|
testRunner.setProperty(EvaluateJsonPath.RETURN_TYPE, EvaluateJsonPath.RETURN_TYPE_JSON);
|
||||||
|
testRunner.setProperty(EvaluateJsonPath.DESTINATION, EvaluateJsonPath.DESTINATION_ATTRIBUTE);
|
||||||
|
testRunner.setProperty(EvaluateJsonPath.NULL_VALUE_DEFAULT_REPRESENTATION, AbstractJsonPathProcessor.NULL_STRING_OPTION);
|
||||||
|
testRunner.setProperty("stringField", "$.stringField");
|
||||||
|
testRunner.setProperty("missingField", "$.missingField");
|
||||||
|
testRunner.setProperty("nullField", "$.nullField");
|
||||||
|
|
||||||
|
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
|
||||||
|
FlowFile ff = session.create();
|
||||||
|
|
||||||
|
ff = session.write(ff, new OutputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(OutputStream out) throws IOException {
|
||||||
|
try (OutputStream outputStream = new BufferedOutputStream(out)) {
|
||||||
|
outputStream.write("{\"stringField\": \"String Value\", \"nullField\": null}".getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
testRunner.enqueue(ff);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(EvaluateJsonPath.REL_MATCH, 1);
|
||||||
|
|
||||||
|
FlowFile output = testRunner.getFlowFilesForRelationship(EvaluateJsonPath.REL_MATCH).get(0);
|
||||||
|
|
||||||
|
String validFieldValue = output.getAttribute("stringField");
|
||||||
|
assertEquals("String Value", validFieldValue);
|
||||||
|
|
||||||
|
String missingValue = output.getAttribute("missingField");
|
||||||
|
assertEquals("Missing Value", "", missingValue);
|
||||||
|
|
||||||
|
String nullValue = output.getAttribute("nullField");
|
||||||
|
assertEquals("Null Value", "null", nullValue);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,13 +16,20 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
|
import org.apache.nifi.stream.io.BufferedOutputStream;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
|
||||||
|
@ -123,4 +130,85 @@ public class TestSplitJson {
|
||||||
testRunner.assertTransferCount(SplitJson.REL_FAILURE, 1);
|
testRunner.assertTransferCount(SplitJson.REL_FAILURE, 1);
|
||||||
testRunner.getFlowFilesForRelationship(SplitJson.REL_FAILURE).get(0).assertContentEquals(JSON_SNIPPET);
|
testRunner.getFlowFilesForRelationship(SplitJson.REL_FAILURE).get(0).assertContentEquals(JSON_SNIPPET);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplit_pathToNullValue() throws Exception {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||||
|
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$.nullField");
|
||||||
|
|
||||||
|
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
|
||||||
|
FlowFile ff = session.create();
|
||||||
|
|
||||||
|
ff = session.write(ff, new OutputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(OutputStream out) throws IOException {
|
||||||
|
try (OutputStream outputStream = new BufferedOutputStream(out)) {
|
||||||
|
outputStream.write("{\"stringField\": \"String Value\", \"nullField\": null}".getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
testRunner.enqueue(ff);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(SplitJson.REL_FAILURE, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplit_pathToArrayWithNulls_emptyStringRepresentation() throws Exception {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||||
|
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$.arrayOfNulls");
|
||||||
|
|
||||||
|
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
|
||||||
|
FlowFile ff = session.create();
|
||||||
|
|
||||||
|
ff = session.write(ff, new OutputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(OutputStream out) throws IOException {
|
||||||
|
try (OutputStream outputStream = new BufferedOutputStream(out)) {
|
||||||
|
outputStream.write("{\"stringField\": \"String Value\", \"arrayOfNulls\": [null, null, null]}".getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
testRunner.enqueue(ff);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
/* assert that three files were transferred to split and each is empty */
|
||||||
|
int expectedFiles = 3;
|
||||||
|
testRunner.assertTransferCount(SplitJson.REL_SPLIT, expectedFiles);
|
||||||
|
for (int i = 0; i < expectedFiles; i++) {
|
||||||
|
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(i).assertContentEquals("");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplit_pathToArrayWithNulls_nullStringRepresentation() throws Exception {
|
||||||
|
final TestRunner testRunner = TestRunners.newTestRunner(new SplitJson());
|
||||||
|
testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, "$.arrayOfNulls");
|
||||||
|
testRunner.setProperty(SplitJson.NULL_VALUE_DEFAULT_REPRESENTATION,
|
||||||
|
AbstractJsonPathProcessor.NULL_STRING_OPTION);
|
||||||
|
|
||||||
|
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
|
||||||
|
FlowFile ff = session.create();
|
||||||
|
|
||||||
|
ff = session.write(ff, new OutputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(OutputStream out) throws IOException {
|
||||||
|
try (OutputStream outputStream = new BufferedOutputStream(out)) {
|
||||||
|
outputStream.write("{\"stringField\": \"String Value\", \"arrayOfNulls\": [null, null, null]}".getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
testRunner.enqueue(ff);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
/* assert that three files were transferred to split and each has the word null in it */
|
||||||
|
int expectedFiles = 3;
|
||||||
|
testRunner.assertTransferCount(SplitJson.REL_SPLIT, expectedFiles);
|
||||||
|
for (int i = 0; i < expectedFiles; i++) {
|
||||||
|
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(i).assertContentEquals("null");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,7 +84,7 @@
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||||
<org.slf4j.version>1.7.10</org.slf4j.version>
|
<org.slf4j.version>1.7.10</org.slf4j.version>
|
||||||
<jetty.version>9.2.5.v20141112</jetty.version>
|
<jetty.version>9.2.10.v20150310</jetty.version>
|
||||||
<lucene.version>4.10.3</lucene.version>
|
<lucene.version>4.10.3</lucene.version>
|
||||||
<spring.version>4.1.4.RELEASE</spring.version>
|
<spring.version>4.1.4.RELEASE</spring.version>
|
||||||
<spring.security.version>3.2.5.RELEASE</spring.security.version>
|
<spring.security.version>3.2.5.RELEASE</spring.security.version>
|
||||||
|
|
Loading…
Reference in New Issue