diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 2557d1a442..89021c6004 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -395,6 +395,7 @@ src/test/resources/TestConvertJSONToSQL/person-with-null-code.json src/test/resources/TestConvertJSONToSQL/person-without-id.json src/test/resources/TestConvertJSONToSQL/person-with-bool.json + src/test/resources/TestCountText/jabberwocky.txt src/test/resources/TestModifyBytes/noFooter.txt src/test/resources/TestModifyBytes/noFooter_noHeader.txt src/test/resources/TestModifyBytes/noHeader.txt diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java new file mode 100644 index 0000000000..20195bdb20 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " + + "The resulting flowfile will not have its content modified.") +@WritesAttributes({ + @WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), + @WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), + @WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), + @WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { + private static final List STANDARD_CHARSETS = Arrays.asList( + StandardCharsets.UTF_8, + StandardCharsets.US_ASCII, + StandardCharsets.ISO_8859_1, + StandardCharsets.UTF_16, + StandardCharsets.UTF_16LE, + StandardCharsets.UTF_16BE); + + private static final Pattern SYMBOL_PATTERN = Pattern.compile("[\\s-\\._]"); + private static final Pattern WHITESPACE_ONLY_PATTERN = Pattern.compile("\\s"); + + // Attribute keys + public static final String TEXT_LINE_COUNT = "text.line.count"; + public static final String TEXT_LINE_NONEMPTY_COUNT = "text.line.nonempty.count"; + public static final String TEXT_WORD_COUNT = "text.word.count"; + public static final String TEXT_CHARACTER_COUNT = "text.character.count"; + + + public static final PropertyDescriptor TEXT_LINE_COUNT_PD = new PropertyDescriptor.Builder() + .name("text-line-count") + .displayName("Count Lines") + .description("If enabled, will count the number of lines present in the incoming text.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor TEXT_LINE_NONEMPTY_COUNT_PD = new PropertyDescriptor.Builder() + .name("text-line-nonempty-count") + .displayName("Count Non-Empty Lines") + .description("If enabled, will count the number of lines that contain a non-whitespace character present in the incoming text.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor TEXT_WORD_COUNT_PD = new PropertyDescriptor.Builder() + .name("text-word-count") + .displayName("Count Words") + .description("If enabled, will count the number of words (alphanumeric character groups bounded by whitespace)" + + " present in the incoming text. Common logical delimiters [_-.] do not bound a word unless 'Split Words on Symbols' is true.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor TEXT_CHARACTER_COUNT_PD = new PropertyDescriptor.Builder() + .name("text-character-count") + .displayName("Count Characters") + .description("If enabled, will count the number of characters (including whitespace and symbols, but not including newlines and carriage returns) present in the incoming text.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor SPLIT_WORDS_ON_SYMBOLS_PD = new PropertyDescriptor.Builder() + .name("split-words-on-symbols") + .displayName("Split Words on Symbols") + .description("If enabled, the word count will identify strings separated by common logical delimiters [ _ - . ] as independent words (ex. split-words-on-symbols = 4 words).") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor CHARACTER_ENCODING_PD = new PropertyDescriptor.Builder() + .name("character-encoding") + .displayName("Character Encoding") + .description("Specifies a character encoding to use.") + .required(true) + .allowableValues(getStandardCharsetNames()) + .defaultValue(StandardCharsets.UTF_8.displayName()) + .build(); + + private static Set getStandardCharsetNames() { + return STANDARD_CHARSETS.stream().map(c -> c.displayName()).collect(Collectors.toSet()); + } + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The flowfile contains the original content with one or more attributes added containing the respective counts") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If the flowfile text cannot be counted for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") + .build(); + + private static final List properties; + private static final Set relationships; + + static { + properties = Collections.unmodifiableList(Arrays.asList(TEXT_LINE_COUNT_PD, + TEXT_LINE_NONEMPTY_COUNT_PD, + TEXT_WORD_COUNT_PD, + TEXT_CHARACTER_COUNT_PD, + SPLIT_WORDS_ON_SYMBOLS_PD, + CHARACTER_ENCODING_PD)); + + relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, + REL_FAILURE))); + } + + private volatile boolean countLines; + private volatile boolean countLinesNonEmpty; + private volatile boolean countWords; + private volatile boolean countCharacters; + private volatile boolean splitWordsOnSymbols; + private volatile String characterEncoding = StandardCharsets.UTF_8.name(); + + private volatile int lineCount; + private volatile int lineNonEmptyCount; + private volatile int wordCount; + private volatile int characterCount; + + @Override + public Set getRelationships() { + return relationships; + } + + @OnScheduled + public void onSchedule(ProcessContext context) { + this.countLines = context.getProperty(TEXT_LINE_COUNT_PD).isSet() + ? context.getProperty(TEXT_LINE_COUNT_PD).asBoolean() : false; + this.countLinesNonEmpty = context.getProperty(TEXT_LINE_NONEMPTY_COUNT_PD).isSet() + ? context.getProperty(TEXT_LINE_NONEMPTY_COUNT_PD).asBoolean() : false; + this.countWords = context.getProperty(TEXT_WORD_COUNT_PD).isSet() + ? context.getProperty(TEXT_WORD_COUNT_PD).asBoolean() : false; + this.countCharacters = context.getProperty(TEXT_CHARACTER_COUNT_PD).isSet() + ? context.getProperty(TEXT_CHARACTER_COUNT_PD).asBoolean() : false; + this.splitWordsOnSymbols = context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).isSet() + ? context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).asBoolean() : false; + this.characterEncoding = context.getProperty(CHARACTER_ENCODING_PD).getValue(); + } + + /** + * Will count text attributes of the incoming stream. + */ + @Override + public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException { + FlowFile sourceFlowFile = processSession.get(); + if (sourceFlowFile == null) { + return; + } + AtomicBoolean error = new AtomicBoolean(); + + lineCount = 0; + lineNonEmptyCount = 0; + wordCount = 0; + characterCount = 0; + + processSession.read(sourceFlowFile, in -> { + long start = System.nanoTime(); + + // Iterate over the lines in the text input + try { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in, characterEncoding)); + String line; + while ((line = bufferedReader.readLine()) != null) { + if (countLines) { + lineCount++; + } + + if (countLinesNonEmpty) { + if (line.trim().length() > 0) { + lineNonEmptyCount++; + } + } + + if (countWords) { + wordCount += countWordsInLine(line, splitWordsOnSymbols); + } + + if (countCharacters) { + characterCount += line.length(); + } + } + long stop = System.nanoTime(); + if (getLogger().isDebugEnabled()) { + final long durationNanos = stop - start; + DecimalFormat df = new DecimalFormat("#.###"); + getLogger().debug("Computed metrics in " + durationNanos + " nanoseconds (" + df.format(durationNanos / 1_000_000_000.0) + " seconds)."); + } + if (getLogger().isInfoEnabled()) { + String message = generateMetricsMessage(); + getLogger().info(message); + } + + // Update session counters + processSession.adjustCounter("Lines Counted", (long) lineCount, false); + processSession.adjustCounter("Lines (non-empty) Counted", (long) lineNonEmptyCount, false); + processSession.adjustCounter("Words Counted", (long) wordCount, false); + processSession.adjustCounter("Characters Counted", (long) characterCount, false); + } catch (IOException e) { + error.set(true); + getLogger().error(e.getMessage() + " Routing to failure.", e); + } + }); + + if (error.get()) { + processSession.transfer(sourceFlowFile, REL_FAILURE); + } else { + Map metricAttributes = new HashMap<>(); + if (countLines) { + metricAttributes.put(TEXT_LINE_COUNT, String.valueOf(lineCount)); + } + if (countLinesNonEmpty) { + metricAttributes.put(TEXT_LINE_NONEMPTY_COUNT, String.valueOf(lineNonEmptyCount)); + } + if (countWords) { + metricAttributes.put(TEXT_WORD_COUNT, String.valueOf(wordCount)); + } + if (countCharacters) { + metricAttributes.put(TEXT_CHARACTER_COUNT, String.valueOf(characterCount)); + } + FlowFile updatedFlowFile = processSession.putAllAttributes(sourceFlowFile, metricAttributes); + processSession.transfer(updatedFlowFile, REL_SUCCESS); + } + } + + private String generateMetricsMessage() { + StringBuilder sb = new StringBuilder("Counted "); + List metrics = new ArrayList<>(); + if (countLines) { + metrics.add(lineCount + " lines"); + } + if (countLinesNonEmpty) { + metrics.add(lineNonEmptyCount + " non-empty lines"); + } + if (countWords) { + metrics.add(wordCount + " words"); + } + if (countCharacters) { + metrics.add(characterCount + " characters"); + } + sb.append(StringUtils.join(metrics, ", ")); + return sb.toString(); + } + + int countWordsInLine(String line, boolean splitWordsOnSymbols) throws IOException { + if (line == null || line.trim().length() == 0) { + return 0; + } else { + Pattern regex = splitWordsOnSymbols ? SYMBOL_PATTERN : WHITESPACE_ONLY_PATTERN; + final String[] words = regex.split(line); + // TODO: Trim individual words before counting to eliminate whitespace words? + if (getLogger().isDebugEnabled()) { + getLogger().debug("Split [" + line + "] to [" + StringUtils.join(Arrays.asList(words), ", ") + "] (" + words.length + ")"); + } + return words.length; + } + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 3fb0de3405..a93b37fd5b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -19,6 +19,7 @@ org.apache.nifi.processors.standard.ControlRate org.apache.nifi.processors.standard.ConvertCharacterSet org.apache.nifi.processors.standard.ConvertJSONToSQL org.apache.nifi.processors.standard.ConvertRecord +org.apache.nifi.processors.standard.CountText org.apache.nifi.processors.standard.DebugFlow org.apache.nifi.processors.standard.DetectDuplicate org.apache.nifi.processors.standard.DistributeLoad diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy new file mode 100644 index 0000000000..52b69081c0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard + +import org.apache.nifi.components.PropertyDescriptor +import org.apache.nifi.flowfile.FlowFile +import org.apache.nifi.util.MockProcessSession +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.bouncycastle.jce.provider.BouncyCastleProvider +import org.junit.After +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.mockito.Mockito +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import java.nio.charset.StandardCharsets +import java.security.Security + +import static org.mockito.Matchers.anyBoolean +import static org.mockito.Matchers.anyString +import static org.mockito.Mockito.when + +@RunWith(JUnit4.class) +class CountTextTest extends GroovyTestCase { + private static final Logger logger = LoggerFactory.getLogger(CountTextTest.class) + + private static final String TLC = "text.line.count" + private static final String TLNEC = "text.line.nonempty.count" + private static final String TWC = "text.word.count" + private static final String TCC = "text.character.count" + + + @BeforeClass + static void setUpOnce() throws Exception { + Security.addProvider(new BouncyCastleProvider()) + + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + void setUp() throws Exception { + } + + @After + void tearDown() throws Exception { + } + + @Test + void testShouldCountAllMetrics() throws Exception { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(CountText.class) + + runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true") + + // This text is the same as in src/test/resources/TestCountText/jabberwocky.txt but is copied here + // to ensure that reading from a file vs. static text doesn't cause line break issues + String INPUT_TEXT = """’Twas brillig, and the slithy toves +Did gyre and gimble in the wade; +All mimsy were the borogoves, +And the mome raths outgrabe. + +"Beware the Jabberwock, my son! +The jaws that bite, the claws that catch! +Beware the Jubjub bird, and shun +The frumious Bandersnatch!" + +He took his vorpal sword in hand: +Long time the manxome foe he sought— +So rested he by the Tumtum tree, +And stood awhile in thought. + +And as in uffish thought he stood, +The Jabberwock, with eyes of flame, +Came whiffling through the tulgey wood. +And burbled as it came! + +One, two! One, two! And through and through +The vorpal blade went snicker-snack! +He left it dead, and with its head +He went galumphing back. + +"And hast thou slain the Jabberwock? +Come to my arms, my beamish boy! +O frabjous day! Callooh! Callay!" +He chortled in his joy. + +’Twas brillig, and the slithy toves +Did gyre and gimble in the wabe; +All mimsy were the borogoves, +And the mome raths outgrabe.""" + + runner.enqueue(INPUT_TEXT.bytes) + + // Act + runner.run() + + // Assert + runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1) + FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first() + assert flowFile.attributes."$TLC" == 34 as String + assert flowFile.attributes."$TLNEC" == 28 as String + assert flowFile.attributes."$TWC" == 166 as String + assert flowFile.attributes."$TCC" == 900 as String + } + + @Test + void testShouldCountEachMetric() throws Exception { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(CountText.class) + String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text + + final def EXPECTED_VALUES = [ + (TLC) : 34, + (TLNEC): 28, + (TWC) : 166, + (TCC) : 900, + ] + + def linesOnly = [(CountText.TEXT_LINE_COUNT_PD): "true"] + def linesNonEmptyOnly = [(CountText.TEXT_LINE_NONEMPTY_COUNT_PD): "true"] + def wordsOnly = [(CountText.TEXT_WORD_COUNT_PD): "true"] + def charactersOnly = [(CountText.TEXT_CHARACTER_COUNT_PD): "true"] + + final List> SCENARIOS = [linesOnly, linesNonEmptyOnly, wordsOnly, charactersOnly] + + SCENARIOS.each { map -> + // Reset the processor properties + runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "false") + runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "false") + runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "false") + runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "false") + + // Apply the scenario-specific properties + map.each { key, value -> + runner.setProperty(key, value) + } + + runner.clearProvenanceEvents() + runner.clearTransferState() + runner.enqueue(INPUT_TEXT.bytes) + + // Act + runner.run() + + // Assert + runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1) + FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first() + logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}") + EXPECTED_VALUES.each { key, value -> + if (flowFile.attributes.containsKey(key)) { + assert flowFile.attributes.get(key) == value as String + } + } + } + } + + @Test + void testShouldCountWordsSplitOnSymbol() throws Exception { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(CountText.class) + String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text + + final int EXPECTED_WORD_COUNT = 167 + + // Reset the processor properties + runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "false") + runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "false") + runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "false") + runner.setProperty(CountText.SPLIT_WORDS_ON_SYMBOLS_PD, "true") + + runner.clearProvenanceEvents() + runner.clearTransferState() + runner.enqueue(INPUT_TEXT.bytes) + + // Act + runner.run() + + // Assert + runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1) + FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first() + logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}") + assert flowFile.attributes.get(CountText.TEXT_WORD_COUNT) == EXPECTED_WORD_COUNT as String + } + + @Test + void testShouldCountIndependentlyPerFlowFile() throws Exception { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(CountText.class) + String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text + + final def EXPECTED_VALUES = [ + (TLC) : 34, + (TLNEC): 28, + (TWC) : 166, + (TCC) : 900, + ] + + // Reset the processor properties + runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true") + + 2.times { int i -> + runner.clearProvenanceEvents() + runner.clearTransferState() + runner.enqueue(INPUT_TEXT.bytes) + + // Act + runner.run() + + // Assert + runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1) + FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first() + logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}") + EXPECTED_VALUES.each { key, value -> + if (flowFile.attributes.containsKey(key)) { + assert flowFile.attributes.get(key) == value as String + } + } + } + } + + @Test + void testShouldTrackSessionCountersAcrossMultipleFlowfiles() throws Exception { + // Arrange + final TestRunner runner = TestRunners.newTestRunner(CountText.class) + String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text + + final def EXPECTED_VALUES = [ + (TLC) : 34, + (TLNEC): 28, + (TWC) : 166, + (TCC) : 900, + ] + + // Reset the processor properties + runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true") + + MockProcessSession mockPS = runner.processSessionFactory.createSession() + def sessionCounters = mockPS.sharedState.counterMap + logger.info("Session counters (0): ${sessionCounters}") + + int n = 2 + + n.times { int i -> + runner.clearTransferState() + runner.enqueue(INPUT_TEXT.bytes) + + // Act + runner.run() + logger.info("Session counters (${i + 1}): ${sessionCounters}") + + // Assert + runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1) + FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first() + logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}") + EXPECTED_VALUES.each { key, value -> + if (flowFile.attributes.containsKey(key)) { + assert flowFile.attributes.get(key) == value as String + } + } + } + + assert sessionCounters.get("Lines Counted").get() == EXPECTED_VALUES[TLC] * n as long + assert sessionCounters.get("Lines (non-empty) Counted").get() == EXPECTED_VALUES[TLNEC] * n as long + assert sessionCounters.get("Words Counted").get() == EXPECTED_VALUES[TWC] * n as long + assert sessionCounters.get("Characters Counted").get() == EXPECTED_VALUES[TCC] * n as long + } + + @Test + void testShouldHandleInternalError() throws Exception { + // Arrange + CountText ct = new CountText() + ct.countLines = true + ct.countLinesNonEmpty = true + ct.countWords = true + ct.countCharacters = true + + CountText ctSpy = Mockito.spy(ct) + when(ctSpy.countWordsInLine(anyString(), anyBoolean())).thenThrow(new IOException("Expected exception")) + + final TestRunner runner = TestRunners.newTestRunner(ctSpy) + final String INPUT_TEXT = "This flowfile should throw an error" + + // Reset the processor properties + runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true") + runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true") + runner.setProperty(CountText.CHARACTER_ENCODING_PD, StandardCharsets.US_ASCII.displayName()) + + runner.enqueue(INPUT_TEXT.bytes) + + // Act + // Need initialize = true to run #onScheduled() + runner.run(1, true, true) + + // Assert + runner.assertAllFlowFilesTransferred(CountText.REL_FAILURE, 1) + FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_FAILURE).first() + logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}") + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt new file mode 100644 index 0000000000..9d76b37728 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt @@ -0,0 +1,34 @@ +’Twas brillig, and the slithy toves +Did gyre and gimble in the wade; +All mimsy were the borogoves, +And the mome raths outgrabe. + +"Beware the Jabberwock, my son! +The jaws that bite, the claws that catch! +Beware the Jubjub bird, and shun +The frumious Bandersnatch!" + +He took his vorpal sword in hand: +Long time the manxome foe he sought— +So rested he by the Tumtum tree, +And stood awhile in thought. + +And as in uffish thought he stood, +The Jabberwock, with eyes of flame, +Came whiffling through the tulgey wood. +And burbled as it came! + +One, two! One, two! And through and through +The vorpal blade went snicker-snack! +He left it dead, and with its head +He went galumphing back. + +"And hast thou slain the Jabberwock? +Come to my arms, my beamish boy! +O frabjous day! Callooh! Callay!" +He chortled in his joy. + +’Twas brillig, and the slithy toves +Did gyre and gimble in the wabe; +All mimsy were the borogoves, +And the mome raths outgrabe. \ No newline at end of file