NIFI-6395: Thread-safety bug fixed and added new flag property to handle the way to adjust the counters

Update CountText.java

Local variable changes by AtomicInteger

NIFI-6395 - Fix line is longer than 200 characters

This closes #3552.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Ivan Ezequiel Rodriguez 2019-06-26 02:10:13 -03:00 committed by Koji Kawamura
parent af0777b2c9
commit 7f96fa1d0d
No known key found for this signature in database
GPG Key ID: 36136B0EC89E4758
1 changed files with 34 additions and 25 deletions

View File

@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
@ -141,7 +142,16 @@ public class CountText extends AbstractProcessor {
.allowableValues(getStandardCharsetNames()) .allowableValues(getStandardCharsetNames())
.defaultValue(StandardCharsets.UTF_8.displayName()) .defaultValue(StandardCharsets.UTF_8.displayName())
.build(); .build();
public static final PropertyDescriptor ADJUST_IMMEDIATELY = new PropertyDescriptor.Builder()
.name("ajust-immediately")
.displayName("Call Immediate Adjustment")
.description("If true, the counter will be updated immediately, without regard to whether the ProcessSession is commit or rolled back;" +
"otherwise, the counter will be incremented only if and when the ProcessSession is committed.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
private static Set<String> getStandardCharsetNames() { private static Set<String> getStandardCharsetNames() {
return STANDARD_CHARSETS.stream().map(c -> c.displayName()).collect(Collectors.toSet()); return STANDARD_CHARSETS.stream().map(c -> c.displayName()).collect(Collectors.toSet());
} }
@ -164,7 +174,8 @@ public class CountText extends AbstractProcessor {
TEXT_WORD_COUNT_PD, TEXT_WORD_COUNT_PD,
TEXT_CHARACTER_COUNT_PD, TEXT_CHARACTER_COUNT_PD,
SPLIT_WORDS_ON_SYMBOLS_PD, SPLIT_WORDS_ON_SYMBOLS_PD,
CHARACTER_ENCODING_PD)); CHARACTER_ENCODING_PD,
ADJUST_IMMEDIATELY));
relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS,
REL_FAILURE))); REL_FAILURE)));
@ -175,13 +186,9 @@ public class CountText extends AbstractProcessor {
private volatile boolean countWords; private volatile boolean countWords;
private volatile boolean countCharacters; private volatile boolean countCharacters;
private volatile boolean splitWordsOnSymbols; private volatile boolean splitWordsOnSymbols;
private volatile boolean adjustImmediately;
private volatile String characterEncoding = StandardCharsets.UTF_8.name(); private volatile String characterEncoding = StandardCharsets.UTF_8.name();
private volatile int lineCount;
private volatile int lineNonEmptyCount;
private volatile int wordCount;
private volatile int characterCount;
@Override @Override
public Set<Relationship> getRelationships() { public Set<Relationship> getRelationships() {
return relationships; return relationships;
@ -199,6 +206,8 @@ public class CountText extends AbstractProcessor {
? context.getProperty(TEXT_CHARACTER_COUNT_PD).asBoolean() : false; ? context.getProperty(TEXT_CHARACTER_COUNT_PD).asBoolean() : false;
this.splitWordsOnSymbols = context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).isSet() this.splitWordsOnSymbols = context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).isSet()
? context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).asBoolean() : false; ? context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).asBoolean() : false;
this.adjustImmediately = context.getProperty(ADJUST_IMMEDIATELY).isSet()
? context.getProperty(ADJUST_IMMEDIATELY).asBoolean() : false;
this.characterEncoding = context.getProperty(CHARACTER_ENCODING_PD).getValue(); this.characterEncoding = context.getProperty(CHARACTER_ENCODING_PD).getValue();
} }
@ -213,10 +222,10 @@ public class CountText extends AbstractProcessor {
} }
AtomicBoolean error = new AtomicBoolean(); AtomicBoolean error = new AtomicBoolean();
lineCount = 0; final AtomicInteger lineCount = new AtomicInteger(0);
lineNonEmptyCount = 0; final AtomicInteger lineNonEmptyCount = new AtomicInteger(0);
wordCount = 0; final AtomicInteger wordCount = new AtomicInteger(0);
characterCount = 0; final AtomicInteger characterCount = new AtomicInteger(0);
processSession.read(sourceFlowFile, in -> { processSession.read(sourceFlowFile, in -> {
long start = System.nanoTime(); long start = System.nanoTime();
@ -227,21 +236,21 @@ public class CountText extends AbstractProcessor {
String line; String line;
while ((line = bufferedReader.readLine()) != null) { while ((line = bufferedReader.readLine()) != null) {
if (countLines) { if (countLines) {
lineCount++; lineCount.incrementAndGet();
} }
if (countLinesNonEmpty) { if (countLinesNonEmpty) {
if (line.trim().length() > 0) { if (line.trim().length() > 0) {
lineNonEmptyCount++; lineNonEmptyCount.incrementAndGet();
} }
} }
if (countWords) { if (countWords) {
wordCount += countWordsInLine(line, splitWordsOnSymbols); wordCount.addAndGet(countWordsInLine(line, splitWordsOnSymbols));
} }
if (countCharacters) { if (countCharacters) {
characterCount += line.length(); characterCount.addAndGet(line.length());
} }
} }
long stop = System.nanoTime(); long stop = System.nanoTime();
@ -251,15 +260,15 @@ public class CountText extends AbstractProcessor {
getLogger().debug("Computed metrics in " + durationNanos + " nanoseconds (" + df.format(durationNanos / 1_000_000_000.0) + " seconds)."); getLogger().debug("Computed metrics in " + durationNanos + " nanoseconds (" + df.format(durationNanos / 1_000_000_000.0) + " seconds).");
} }
if (getLogger().isInfoEnabled()) { if (getLogger().isInfoEnabled()) {
String message = generateMetricsMessage(); String message = generateMetricsMessage(lineCount.get(), lineNonEmptyCount.get(), wordCount.get(), characterCount.get());
getLogger().info(message); getLogger().info(message);
} }
// Update session counters // Update session counters
processSession.adjustCounter("Lines Counted", (long) lineCount, false); processSession.adjustCounter("Lines Counted", (long) lineCount.get(), adjustImmediately);
processSession.adjustCounter("Lines (non-empty) Counted", (long) lineNonEmptyCount, false); processSession.adjustCounter("Lines (non-empty) Counted", (long) lineNonEmptyCount.get(), adjustImmediately);
processSession.adjustCounter("Words Counted", (long) wordCount, false); processSession.adjustCounter("Words Counted", (long) wordCount.get(), adjustImmediately);
processSession.adjustCounter("Characters Counted", (long) characterCount, false); processSession.adjustCounter("Characters Counted", (long) characterCount.get(), adjustImmediately);
} catch (IOException e) { } catch (IOException e) {
error.set(true); error.set(true);
getLogger().error(e.getMessage() + " Routing to failure.", e); getLogger().error(e.getMessage() + " Routing to failure.", e);
@ -271,23 +280,23 @@ public class CountText extends AbstractProcessor {
} else { } else {
Map<String, String> metricAttributes = new HashMap<>(); Map<String, String> metricAttributes = new HashMap<>();
if (countLines) { if (countLines) {
metricAttributes.put(TEXT_LINE_COUNT, String.valueOf(lineCount)); metricAttributes.put(TEXT_LINE_COUNT, String.valueOf(lineCount.get()));
} }
if (countLinesNonEmpty) { if (countLinesNonEmpty) {
metricAttributes.put(TEXT_LINE_NONEMPTY_COUNT, String.valueOf(lineNonEmptyCount)); metricAttributes.put(TEXT_LINE_NONEMPTY_COUNT, String.valueOf(lineNonEmptyCount.get()));
} }
if (countWords) { if (countWords) {
metricAttributes.put(TEXT_WORD_COUNT, String.valueOf(wordCount)); metricAttributes.put(TEXT_WORD_COUNT, String.valueOf(wordCount.get()));
} }
if (countCharacters) { if (countCharacters) {
metricAttributes.put(TEXT_CHARACTER_COUNT, String.valueOf(characterCount)); metricAttributes.put(TEXT_CHARACTER_COUNT, String.valueOf(characterCount.get()));
} }
FlowFile updatedFlowFile = processSession.putAllAttributes(sourceFlowFile, metricAttributes); FlowFile updatedFlowFile = processSession.putAllAttributes(sourceFlowFile, metricAttributes);
processSession.transfer(updatedFlowFile, REL_SUCCESS); processSession.transfer(updatedFlowFile, REL_SUCCESS);
} }
} }
private String generateMetricsMessage() { private String generateMetricsMessage(int lineCount, int lineNonEmptyCount, int wordCount, int characterCount) {
StringBuilder sb = new StringBuilder("Counted "); StringBuilder sb = new StringBuilder("Counted ");
List<String> metrics = new ArrayList<>(); List<String> metrics = new ArrayList<>();
if (countLines) { if (countLines) {