NIFI-4727 Added CountText processor and unit test.

This closes #2371.

Signed-off-by: Kevin Doran <kdoran.apache@gmail.org>
This commit is contained in:
Andy LoPresto 2018-01-02 14:47:33 -05:00 committed by Andy LoPresto
parent 4196140e4c
commit a7f1eb89c2
No known key found for this signature in database
GPG Key ID: 6EC293152D90B61D
5 changed files with 695 additions and 0 deletions

View File

@ -395,6 +395,7 @@
<exclude>src/test/resources/TestConvertJSONToSQL/person-with-null-code.json</exclude>
<exclude>src/test/resources/TestConvertJSONToSQL/person-without-id.json</exclude>
<exclude>src/test/resources/TestConvertJSONToSQL/person-with-bool.json</exclude>
<exclude>src/test/resources/TestCountText/jabberwocky.txt</exclude>
<exclude>src/test/resources/TestModifyBytes/noFooter.txt</exclude>
<exclude>src/test/resources/TestModifyBytes/noFooter_noHeader.txt</exclude>
<exclude>src/test/resources/TestModifyBytes/noHeader.txt</exclude>

View File

@ -0,0 +1,327 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"count", "text", "line", "word", "character"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. "
+ "The resulting flowfile will not have its content modified.")
@WritesAttributes({
@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"),
@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"),
@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"),
@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"),
})
@SeeAlso(SplitText.class)
public class CountText extends AbstractProcessor {
private static final List<Charset> STANDARD_CHARSETS = Arrays.asList(
StandardCharsets.UTF_8,
StandardCharsets.US_ASCII,
StandardCharsets.ISO_8859_1,
StandardCharsets.UTF_16,
StandardCharsets.UTF_16LE,
StandardCharsets.UTF_16BE);
private static final Pattern SYMBOL_PATTERN = Pattern.compile("[\\s-\\._]");
private static final Pattern WHITESPACE_ONLY_PATTERN = Pattern.compile("\\s");
// Attribute keys
public static final String TEXT_LINE_COUNT = "text.line.count";
public static final String TEXT_LINE_NONEMPTY_COUNT = "text.line.nonempty.count";
public static final String TEXT_WORD_COUNT = "text.word.count";
public static final String TEXT_CHARACTER_COUNT = "text.character.count";
public static final PropertyDescriptor TEXT_LINE_COUNT_PD = new PropertyDescriptor.Builder()
.name("text-line-count")
.displayName("Count Lines")
.description("If enabled, will count the number of lines present in the incoming text.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor TEXT_LINE_NONEMPTY_COUNT_PD = new PropertyDescriptor.Builder()
.name("text-line-nonempty-count")
.displayName("Count Non-Empty Lines")
.description("If enabled, will count the number of lines that contain a non-whitespace character present in the incoming text.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor TEXT_WORD_COUNT_PD = new PropertyDescriptor.Builder()
.name("text-word-count")
.displayName("Count Words")
.description("If enabled, will count the number of words (alphanumeric character groups bounded by whitespace)" +
" present in the incoming text. Common logical delimiters [_-.] do not bound a word unless 'Split Words on Symbols' is true.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor TEXT_CHARACTER_COUNT_PD = new PropertyDescriptor.Builder()
.name("text-character-count")
.displayName("Count Characters")
.description("If enabled, will count the number of characters (including whitespace and symbols, but not including newlines and carriage returns) present in the incoming text.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor SPLIT_WORDS_ON_SYMBOLS_PD = new PropertyDescriptor.Builder()
.name("split-words-on-symbols")
.displayName("Split Words on Symbols")
.description("If enabled, the word count will identify strings separated by common logical delimiters [ _ - . ] as independent words (ex. split-words-on-symbols = 4 words).")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor CHARACTER_ENCODING_PD = new PropertyDescriptor.Builder()
.name("character-encoding")
.displayName("Character Encoding")
.description("Specifies a character encoding to use.")
.required(true)
.allowableValues(getStandardCharsetNames())
.defaultValue(StandardCharsets.UTF_8.displayName())
.build();
private static Set<String> getStandardCharsetNames() {
return STANDARD_CHARSETS.stream().map(c -> c.displayName()).collect(Collectors.toSet());
}
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The flowfile contains the original content with one or more attributes added containing the respective counts")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If the flowfile text cannot be counted for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
.build();
private static final List<PropertyDescriptor> properties;
private static final Set<Relationship> relationships;
static {
properties = Collections.unmodifiableList(Arrays.asList(TEXT_LINE_COUNT_PD,
TEXT_LINE_NONEMPTY_COUNT_PD,
TEXT_WORD_COUNT_PD,
TEXT_CHARACTER_COUNT_PD,
SPLIT_WORDS_ON_SYMBOLS_PD,
CHARACTER_ENCODING_PD));
relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS,
REL_FAILURE)));
}
private volatile boolean countLines;
private volatile boolean countLinesNonEmpty;
private volatile boolean countWords;
private volatile boolean countCharacters;
private volatile boolean splitWordsOnSymbols;
private volatile String characterEncoding = StandardCharsets.UTF_8.name();
private volatile int lineCount;
private volatile int lineNonEmptyCount;
private volatile int wordCount;
private volatile int characterCount;
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@OnScheduled
public void onSchedule(ProcessContext context) {
this.countLines = context.getProperty(TEXT_LINE_COUNT_PD).isSet()
? context.getProperty(TEXT_LINE_COUNT_PD).asBoolean() : false;
this.countLinesNonEmpty = context.getProperty(TEXT_LINE_NONEMPTY_COUNT_PD).isSet()
? context.getProperty(TEXT_LINE_NONEMPTY_COUNT_PD).asBoolean() : false;
this.countWords = context.getProperty(TEXT_WORD_COUNT_PD).isSet()
? context.getProperty(TEXT_WORD_COUNT_PD).asBoolean() : false;
this.countCharacters = context.getProperty(TEXT_CHARACTER_COUNT_PD).isSet()
? context.getProperty(TEXT_CHARACTER_COUNT_PD).asBoolean() : false;
this.splitWordsOnSymbols = context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).isSet()
? context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).asBoolean() : false;
this.characterEncoding = context.getProperty(CHARACTER_ENCODING_PD).getValue();
}
/**
* Will count text attributes of the incoming stream.
*/
@Override
public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
FlowFile sourceFlowFile = processSession.get();
if (sourceFlowFile == null) {
return;
}
AtomicBoolean error = new AtomicBoolean();
lineCount = 0;
lineNonEmptyCount = 0;
wordCount = 0;
characterCount = 0;
processSession.read(sourceFlowFile, in -> {
long start = System.nanoTime();
// Iterate over the lines in the text input
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in, characterEncoding));
String line;
while ((line = bufferedReader.readLine()) != null) {
if (countLines) {
lineCount++;
}
if (countLinesNonEmpty) {
if (line.trim().length() > 0) {
lineNonEmptyCount++;
}
}
if (countWords) {
wordCount += countWordsInLine(line, splitWordsOnSymbols);
}
if (countCharacters) {
characterCount += line.length();
}
}
long stop = System.nanoTime();
if (getLogger().isDebugEnabled()) {
final long durationNanos = stop - start;
DecimalFormat df = new DecimalFormat("#.###");
getLogger().debug("Computed metrics in " + durationNanos + " nanoseconds (" + df.format(durationNanos / 1_000_000_000.0) + " seconds).");
}
if (getLogger().isInfoEnabled()) {
String message = generateMetricsMessage();
getLogger().info(message);
}
// Update session counters
processSession.adjustCounter("Lines Counted", (long) lineCount, false);
processSession.adjustCounter("Lines (non-empty) Counted", (long) lineNonEmptyCount, false);
processSession.adjustCounter("Words Counted", (long) wordCount, false);
processSession.adjustCounter("Characters Counted", (long) characterCount, false);
} catch (IOException e) {
error.set(true);
getLogger().error(e.getMessage() + " Routing to failure.", e);
}
});
if (error.get()) {
processSession.transfer(sourceFlowFile, REL_FAILURE);
} else {
Map<String, String> metricAttributes = new HashMap<>();
if (countLines) {
metricAttributes.put(TEXT_LINE_COUNT, String.valueOf(lineCount));
}
if (countLinesNonEmpty) {
metricAttributes.put(TEXT_LINE_NONEMPTY_COUNT, String.valueOf(lineNonEmptyCount));
}
if (countWords) {
metricAttributes.put(TEXT_WORD_COUNT, String.valueOf(wordCount));
}
if (countCharacters) {
metricAttributes.put(TEXT_CHARACTER_COUNT, String.valueOf(characterCount));
}
FlowFile updatedFlowFile = processSession.putAllAttributes(sourceFlowFile, metricAttributes);
processSession.transfer(updatedFlowFile, REL_SUCCESS);
}
}
private String generateMetricsMessage() {
StringBuilder sb = new StringBuilder("Counted ");
List<String> metrics = new ArrayList<>();
if (countLines) {
metrics.add(lineCount + " lines");
}
if (countLinesNonEmpty) {
metrics.add(lineNonEmptyCount + " non-empty lines");
}
if (countWords) {
metrics.add(wordCount + " words");
}
if (countCharacters) {
metrics.add(characterCount + " characters");
}
sb.append(StringUtils.join(metrics, ", "));
return sb.toString();
}
int countWordsInLine(String line, boolean splitWordsOnSymbols) throws IOException {
if (line == null || line.trim().length() == 0) {
return 0;
} else {
Pattern regex = splitWordsOnSymbols ? SYMBOL_PATTERN : WHITESPACE_ONLY_PATTERN;
final String[] words = regex.split(line);
// TODO: Trim individual words before counting to eliminate whitespace words?
if (getLogger().isDebugEnabled()) {
getLogger().debug("Split [" + line + "] to [" + StringUtils.join(Arrays.asList(words), ", ") + "] (" + words.length + ")");
}
return words.length;
}
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
}

View File

@ -19,6 +19,7 @@ org.apache.nifi.processors.standard.ControlRate
org.apache.nifi.processors.standard.ConvertCharacterSet
org.apache.nifi.processors.standard.ConvertJSONToSQL
org.apache.nifi.processors.standard.ConvertRecord
org.apache.nifi.processors.standard.CountText
org.apache.nifi.processors.standard.DebugFlow
org.apache.nifi.processors.standard.DetectDuplicate
org.apache.nifi.processors.standard.DistributeLoad

View File

@ -0,0 +1,332 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.util.MockProcessSession
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.junit.After
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.mockito.Mockito
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.nio.charset.StandardCharsets
import java.security.Security
import static org.mockito.Matchers.anyBoolean
import static org.mockito.Matchers.anyString
import static org.mockito.Mockito.when
@RunWith(JUnit4.class)
class CountTextTest extends GroovyTestCase {
private static final Logger logger = LoggerFactory.getLogger(CountTextTest.class)
private static final String TLC = "text.line.count"
private static final String TLNEC = "text.line.nonempty.count"
private static final String TWC = "text.word.count"
private static final String TCC = "text.character.count"
@BeforeClass
static void setUpOnce() throws Exception {
Security.addProvider(new BouncyCastleProvider())
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
void setUp() throws Exception {
}
@After
void tearDown() throws Exception {
}
@Test
void testShouldCountAllMetrics() throws Exception {
// Arrange
final TestRunner runner = TestRunners.newTestRunner(CountText.class)
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
// This text is the same as in src/test/resources/TestCountText/jabberwocky.txt but is copied here
// to ensure that reading from a file vs. static text doesn't cause line break issues
String INPUT_TEXT = """Twas brillig, and the slithy toves
Did gyre and gimble in the wade;
All mimsy were the borogoves,
And the mome raths outgrabe.
"Beware the Jabberwock, my son!
The jaws that bite, the claws that catch!
Beware the Jubjub bird, and shun
The frumious Bandersnatch!"
He took his vorpal sword in hand:
Long time the manxome foe he sought
So rested he by the Tumtum tree,
And stood awhile in thought.
And as in uffish thought he stood,
The Jabberwock, with eyes of flame,
Came whiffling through the tulgey wood.
And burbled as it came!
One, two! One, two! And through and through
The vorpal blade went snicker-snack!
He left it dead, and with its head
He went galumphing back.
"And hast thou slain the Jabberwock?
Come to my arms, my beamish boy!
O frabjous day! Callooh! Callay!"
He chortled in his joy.
Twas brillig, and the slithy toves
Did gyre and gimble in the wabe;
All mimsy were the borogoves,
And the mome raths outgrabe."""
runner.enqueue(INPUT_TEXT.bytes)
// Act
runner.run()
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
assert flowFile.attributes."$TLC" == 34 as String
assert flowFile.attributes."$TLNEC" == 28 as String
assert flowFile.attributes."$TWC" == 166 as String
assert flowFile.attributes."$TCC" == 900 as String
}
@Test
void testShouldCountEachMetric() throws Exception {
// Arrange
final TestRunner runner = TestRunners.newTestRunner(CountText.class)
String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text
final def EXPECTED_VALUES = [
(TLC) : 34,
(TLNEC): 28,
(TWC) : 166,
(TCC) : 900,
]
def linesOnly = [(CountText.TEXT_LINE_COUNT_PD): "true"]
def linesNonEmptyOnly = [(CountText.TEXT_LINE_NONEMPTY_COUNT_PD): "true"]
def wordsOnly = [(CountText.TEXT_WORD_COUNT_PD): "true"]
def charactersOnly = [(CountText.TEXT_CHARACTER_COUNT_PD): "true"]
final List<Map<PropertyDescriptor, String>> SCENARIOS = [linesOnly, linesNonEmptyOnly, wordsOnly, charactersOnly]
SCENARIOS.each { map ->
// Reset the processor properties
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "false")
// Apply the scenario-specific properties
map.each { key, value ->
runner.setProperty(key, value)
}
runner.clearProvenanceEvents()
runner.clearTransferState()
runner.enqueue(INPUT_TEXT.bytes)
// Act
runner.run()
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
EXPECTED_VALUES.each { key, value ->
if (flowFile.attributes.containsKey(key)) {
assert flowFile.attributes.get(key) == value as String
}
}
}
}
@Test
void testShouldCountWordsSplitOnSymbol() throws Exception {
// Arrange
final TestRunner runner = TestRunners.newTestRunner(CountText.class)
String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text
final int EXPECTED_WORD_COUNT = 167
// Reset the processor properties
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "false")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "false")
runner.setProperty(CountText.SPLIT_WORDS_ON_SYMBOLS_PD, "true")
runner.clearProvenanceEvents()
runner.clearTransferState()
runner.enqueue(INPUT_TEXT.bytes)
// Act
runner.run()
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
assert flowFile.attributes.get(CountText.TEXT_WORD_COUNT) == EXPECTED_WORD_COUNT as String
}
@Test
void testShouldCountIndependentlyPerFlowFile() throws Exception {
// Arrange
final TestRunner runner = TestRunners.newTestRunner(CountText.class)
String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text
final def EXPECTED_VALUES = [
(TLC) : 34,
(TLNEC): 28,
(TWC) : 166,
(TCC) : 900,
]
// Reset the processor properties
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
2.times { int i ->
runner.clearProvenanceEvents()
runner.clearTransferState()
runner.enqueue(INPUT_TEXT.bytes)
// Act
runner.run()
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
EXPECTED_VALUES.each { key, value ->
if (flowFile.attributes.containsKey(key)) {
assert flowFile.attributes.get(key) == value as String
}
}
}
}
@Test
void testShouldTrackSessionCountersAcrossMultipleFlowfiles() throws Exception {
// Arrange
final TestRunner runner = TestRunners.newTestRunner(CountText.class)
String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text
final def EXPECTED_VALUES = [
(TLC) : 34,
(TLNEC): 28,
(TWC) : 166,
(TCC) : 900,
]
// Reset the processor properties
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
MockProcessSession mockPS = runner.processSessionFactory.createSession()
def sessionCounters = mockPS.sharedState.counterMap
logger.info("Session counters (0): ${sessionCounters}")
int n = 2
n.times { int i ->
runner.clearTransferState()
runner.enqueue(INPUT_TEXT.bytes)
// Act
runner.run()
logger.info("Session counters (${i + 1}): ${sessionCounters}")
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
EXPECTED_VALUES.each { key, value ->
if (flowFile.attributes.containsKey(key)) {
assert flowFile.attributes.get(key) == value as String
}
}
}
assert sessionCounters.get("Lines Counted").get() == EXPECTED_VALUES[TLC] * n as long
assert sessionCounters.get("Lines (non-empty) Counted").get() == EXPECTED_VALUES[TLNEC] * n as long
assert sessionCounters.get("Words Counted").get() == EXPECTED_VALUES[TWC] * n as long
assert sessionCounters.get("Characters Counted").get() == EXPECTED_VALUES[TCC] * n as long
}
@Test
void testShouldHandleInternalError() throws Exception {
// Arrange
CountText ct = new CountText()
ct.countLines = true
ct.countLinesNonEmpty = true
ct.countWords = true
ct.countCharacters = true
CountText ctSpy = Mockito.spy(ct)
when(ctSpy.countWordsInLine(anyString(), anyBoolean())).thenThrow(new IOException("Expected exception"))
final TestRunner runner = TestRunners.newTestRunner(ctSpy)
final String INPUT_TEXT = "This flowfile should throw an error"
// Reset the processor properties
runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
runner.setProperty(CountText.CHARACTER_ENCODING_PD, StandardCharsets.US_ASCII.displayName())
runner.enqueue(INPUT_TEXT.bytes)
// Act
// Need initialize = true to run #onScheduled()
runner.run(1, true, true)
// Assert
runner.assertAllFlowFilesTransferred(CountText.REL_FAILURE, 1)
FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_FAILURE).first()
logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
}
}

View File

@ -0,0 +1,34 @@
Twas brillig, and the slithy toves
Did gyre and gimble in the wade;
All mimsy were the borogoves,
And the mome raths outgrabe.
"Beware the Jabberwock, my son!
The jaws that bite, the claws that catch!
Beware the Jubjub bird, and shun
The frumious Bandersnatch!"
He took his vorpal sword in hand:
Long time the manxome foe he sought—
So rested he by the Tumtum tree,
And stood awhile in thought.
And as in uffish thought he stood,
The Jabberwock, with eyes of flame,
Came whiffling through the tulgey wood.
And burbled as it came!
One, two! One, two! And through and through
The vorpal blade went snicker-snack!
He left it dead, and with its head
He went galumphing back.
"And hast thou slain the Jabberwock?
Come to my arms, my beamish boy!
O frabjous day! Callooh! Callay!"
He chortled in his joy.
Twas brillig, and the slithy toves
Did gyre and gimble in the wabe;
All mimsy were the borogoves,
And the mome raths outgrabe.