From 564ad0cd71e56b426e02757c7a0a70e28ccbea12 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 18 Oct 2018 12:05:16 -0400 Subject: [PATCH] NIFI-5718: Implemented LineDemarcator and removed NLKBufferedReader in order to improve performance --- .../nifi/stream/io/RepeatingInputStream.java | 103 ++++++++++++ .../io/util/AbstractTextDemarcator.java | 147 ++++++++++++++++++ .../nifi/stream/io/util/LineDemarcator.java | 116 ++++++++++++++ .../stream/io/util/TestLineDemarcator.java | 120 ++++++++++++++ .../nifi/processors/standard/ReplaceText.java | 71 +++++---- .../nifi/processors/standard/RouteText.java | 49 +++--- .../standard/util/NLKBufferedReader.java | 76 --------- 7 files changed, 549 insertions(+), 133 deletions(-) create mode 100644 nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/RepeatingInputStream.java create mode 100644 nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/AbstractTextDemarcator.java create mode 100644 nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/LineDemarcator.java create mode 100644 nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java delete mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/NLKBufferedReader.java diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/RepeatingInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/RepeatingInputStream.java new file mode 100644 index 0000000000..f542741a1a --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/RepeatingInputStream.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; + +public class RepeatingInputStream extends InputStream { + private final byte[] toRepeat; + private final int maxIterations; + + private InputStream bais; + private int repeatCount; + + + public RepeatingInputStream(final byte[] toRepeat, final int iterations) { + if (iterations < 1) { + throw new IllegalArgumentException(); + } + if (Objects.requireNonNull(toRepeat).length == 0) { + throw new IllegalArgumentException(); + } + + this.toRepeat = toRepeat; + this.maxIterations = iterations; + + repeat(); + bais = new ByteArrayInputStream(toRepeat); + repeatCount = 1; + } + + @Override + public int read() throws IOException { + final int value = bais.read(); + if (value > -1) { + return value; + } + + final boolean repeated = repeat(); + if (repeated) { + return bais.read(); + } + + return -1; + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + final int value = bais.read(b, off, len); + if (value > -1) { + return value; + } + + final boolean repeated = repeat(); + if (repeated) { + return bais.read(b, off, len); + } + + return -1; + } + + @Override + public int read(final byte[] b) throws IOException { + final int value = bais.read(b); + if (value > -1) { + return value; + } + + final boolean repeated = repeat(); + if (repeated) { + return bais.read(b); + } + + return -1; + } + + private boolean repeat() { + if (repeatCount >= maxIterations) { + return false; + } + + repeatCount++; + bais = new ByteArrayInputStream(toRepeat); + + return true; + } +} diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/AbstractTextDemarcator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/AbstractTextDemarcator.java new file mode 100644 index 0000000000..f10f66df3c --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/AbstractTextDemarcator.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import org.apache.nifi.stream.io.exception.TokenTooLargeException; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.nio.BufferOverflowException; + +public abstract class AbstractTextDemarcator implements Closeable { + + private static final int INIT_BUFFER_SIZE = 8192; + + private final Reader reader; + + /* + * The maximum allowed size of the token. In the event such size is exceeded + * TokenTooLargeException is thrown. + */ + private final int maxDataSize; + + /* + * Buffer into which the bytes are read from the provided stream. The size + * of the buffer is defined by the 'initialBufferSize' provided in the + * constructor or defaults to the value of INIT_BUFFER_SIZE constant. + */ + char[] buffer; + + /* + * Starting offset of the demarcated token within the current 'buffer'. + */ + int index; + + /* + * Starting offset of the demarcated token within the current 'buffer'. Keep + * in mind that while most of the time it is the same as the 'index' it may + * also have a value of 0 at which point it serves as a signal to the fill() + * operation that buffer needs to be expended if end of token is not reached + * (see fill() operation for more details). + */ + int mark; + + /* + * The length of the bytes valid for reading. It is different from the + * buffer length, since this number may be smaller (e.g., at he end of the + * stream) then actual buffer length. It is set by the fill() operation + * every time more bytes read into buffer. + */ + int availableBytesLength; + + /** + * Constructs an instance of demarcator with provided {@link InputStream} + * and max buffer size. Each demarcated token must fit within max buffer + * size, otherwise the exception will be raised. + */ + AbstractTextDemarcator(Reader reader, int maxDataSize) { + this(reader, maxDataSize, INIT_BUFFER_SIZE); + } + + /** + * Constructs an instance of demarcator with provided {@link InputStream} + * and max buffer size and initial buffer size. Each demarcated token must + * fit within max buffer size, otherwise the exception will be raised. + */ + AbstractTextDemarcator(Reader reader, int maxDataSize, int initialBufferSize) { + this.validate(reader, maxDataSize, initialBufferSize); + this.reader = reader; + this.buffer = new char[initialBufferSize]; + this.maxDataSize = maxDataSize; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + /** + * Will fill the current buffer from current 'index' position, expanding it + * and or shuffling it if necessary. If buffer exceeds max buffer size a + * {@link TokenTooLargeException} will be thrown. + * + * @throws IOException + * if unable to read from the stream + */ + void fill() throws IOException { + if (this.index >= this.buffer.length) { + if (this.mark == 0) { // expand + long expandedSize = Math.min(this.buffer.length * 2, this.buffer.length + 1_048_576); + if (expandedSize > maxDataSize) { + throw new BufferOverflowException(); + } + + char[] newBuff = new char[(int) expandedSize]; + System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length); + this.buffer = newBuff; + } else { // shift the data left in the buffer + int length = this.index - this.mark; + System.arraycopy(this.buffer, this.mark, this.buffer, 0, length); + this.index = length; + this.mark = 0; + } + } + + int bytesRead; + /* + * The do/while pattern is used here similar to the way it is used in + * BufferedReader essentially protecting from assuming the EOS until it + * actually is since not every implementation of InputStream guarantees + * that bytes are always available while the stream is open. + */ + do { + bytesRead = reader.read(this.buffer, this.index, this.buffer.length - this.index); + } while (bytesRead == 0); + this.availableBytesLength = bytesRead != -1 ? this.index + bytesRead : -1; + } + + + /** + * Validates prerequisites for constructor arguments + */ + private void validate(Reader reader, int maxDataSize, int initialBufferSize) { + if (reader == null) { + throw new IllegalArgumentException("'reader' must not be null"); + } else if (maxDataSize <= 0) { + throw new IllegalArgumentException("'maxDataSize' must be > 0"); + } else if (initialBufferSize <= 0) { + throw new IllegalArgumentException("'initialBufferSize' must be > 0"); + } + } +} diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/LineDemarcator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/LineDemarcator.java new file mode 100644 index 0000000000..c08b1a4274 --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/LineDemarcator.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.Charset; + +/** + * A demarcator that scans an InputStream for line endings (carriage returns and new lines) and returns + * lines of text one-at-a-time. This is similar to BufferedReader but with a very important distinction: while + * BufferedReader returns the lines of text after stripping off any line endings, this class returns text including the + * line endings. So, for example, if the following text is provided: + * + * ABC\rXYZ\nABCXYZ\r\nhello + * + * Then calls to {@link #nextLine()} will result in 4 String values being returned: + * + * + * + * All subsequent calls to {@link #nextLine()} will return null. + */ +public class LineDemarcator extends AbstractTextDemarcator { + private static final char CARRIAGE_RETURN = '\r'; + private static final char NEW_LINE = '\n'; + + private char lastChar; + + public LineDemarcator(final InputStream in, final Charset charset, final int maxDataSize, final int initialBufferSize) { + this(new InputStreamReader(in, charset), maxDataSize, initialBufferSize); + } + + public LineDemarcator(final Reader reader, final int maxDataSize, final int initialBufferSize) { + super(reader, maxDataSize, initialBufferSize); + } + + /** + * Will read the next line of text from the {@link InputStream} returning null + * when it reaches the end of the stream. + * + * @throws IOException if unable to read from the stream + */ + public String nextLine() throws IOException { + while (this.availableBytesLength != -1) { + if (this.index >= this.availableBytesLength) { + this.fill(); + } + + if (this.availableBytesLength != -1) { + char charVal; + int i; + for (i = this.index; i < this.availableBytesLength; i++) { + charVal = this.buffer[i]; + + try { + if (charVal == NEW_LINE) { + this.index = i + 1; + + final int size = this.index - this.mark; + final String line = new String(this.buffer, mark, size); + + this.mark = this.index; + return line; + } else if (lastChar == CARRIAGE_RETURN) { + // Point this.index to i+1 because that's the next byte that we want to consume. + this.index = i + 1; + + // Size is equal to where the line began, up to index-1 because we don't want to consume the last byte encountered. + final int size = this.index - 1 - this.mark; + final String line = new String(this.buffer, mark, size); + + // set 'mark' to index - 1 because we don't want to consume the last byte that we've encountered, since we're basing our + // line on the previous byte. + this.mark = this.index - 1; + return line; + } + } finally { + lastChar = charVal; + } + } + + this.index = i; + } else { + final int size = this.index - this.mark; + if (size == 0) { + return null; + } + + return new String(this.buffer, mark, size); + } + } + + return null; + } +} diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java new file mode 100644 index 0000000000..768a60a033 --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class TestLineDemarcator { + + @Test + public void testSingleCharacterLines() throws IOException { + final String input = "A\nB\nC\rD\r\nE\r\nF\r\rG"; + + final List lines = getLines(input); + assertEquals(Arrays.asList("A\n", "B\n", "C\r", "D\r\n", "E\r\n", "F\r", "\r", "G"), lines); + } + + + @Test + public void testEmptyStream() throws IOException { + final List lines = getLines(""); + assertEquals(Collections.emptyList(), lines); + } + + @Test + public void testOnlyEmptyLines() throws IOException { + final String input = "\r\r\r\n\n\n\r\n"; + + final List lines = getLines(input); + assertEquals(Arrays.asList("\r", "\r", "\r\n", "\n", "\n", "\r\n"), lines); + } + + @Test + public void testOnBufferSplit() throws IOException { + final String input = "ABC\r\nXYZ"; + final List lines = getLines(input, 10, 4); + + assertEquals(Arrays.asList("ABC\r\n", "XYZ"), lines); + } + + @Test + public void testEndsWithCarriageReturn() throws IOException { + final List lines = getLines("ABC\r"); + assertEquals(Arrays.asList("ABC\r"), lines); + } + + @Test + public void testEndsWithNewLine() throws IOException { + final List lines = getLines("ABC\n"); + assertEquals(Arrays.asList("ABC\n"), lines); + } + + @Test + public void testEndsWithCarriageReturnNewLine() throws IOException { + final List lines = getLines("ABC\r\n"); + assertEquals(Arrays.asList("ABC\r\n"), lines); + } + + @Test + public void testReadAheadInIsEol() throws IOException { + final String input = "he\ra-to-a\rb-to-b\rc-to-c\r\nd-to-d"; + final List lines = getLines(input, 10, 10); + + assertEquals(Arrays.asList("he\r", "a-to-a\r", "b-to-b\r", "c-to-c\r\n", "d-to-d"), lines); + } + + @Test + public void testFirstCharMatchOnly() throws IOException { + final List lines = getLines("\nThe quick brown fox jumped over the lazy dog."); + assertEquals(Arrays.asList("\n", "The quick brown fox jumped over the lazy dog."), lines); + } + + private List getLines(final String text) throws IOException { + return getLines(text, 8192, 8192); + } + + private List getLines(final String text, final int maxDataSize, final int bufferSize) throws IOException { + final byte[] bytes = text.getBytes(StandardCharsets.UTF_8); + + final List lines = new ArrayList<>(); + + try (final ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + final Reader reader = new InputStreamReader(bais, StandardCharsets.UTF_8); + final LineDemarcator demarcator = new LineDemarcator(reader, maxDataSize, bufferSize)) { + + String line; + while ((line = demarcator.nextLine()) != null) { + lines.add(line); + } + } + + return lines; + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java index 3108a6cf8c..c6aec0c171 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java @@ -49,14 +49,13 @@ import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.util.NLKBufferedReader; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.stream.io.util.LineDemarcator; import org.apache.nifi.util.StopWatch; import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.nio.charset.Charset; @@ -342,7 +341,7 @@ public class ReplaceText extends AbstractProcessor { final StringBuilder sb = new StringBuilder(value.length() + 1); final int groupStart = backRefMatcher.start(1); - sb.append(value.substring(0, groupStart - 1)); + sb.append(value, 0, groupStart - 1); sb.append("\\"); sb.append(value.substring(groupStart - 1)); value = sb.toString(); @@ -370,11 +369,11 @@ public class ReplaceText extends AbstractProcessor { flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(final InputStream in, final OutputStream out) throws IOException { - try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize); - BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) { + try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192); + final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) { String line; - while ((line = br.readLine()) != null) { + while ((line = demarcator.nextLine()) != null) { // We need to determine what line ending was used and use that after our replacement value. lineEndingBuilder.setLength(0); for (int i = line.length() - 1; i >= 0; i--) { @@ -423,10 +422,11 @@ public class ReplaceText extends AbstractProcessor { flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(final InputStream in, final OutputStream out) throws IOException { - try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize); - BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) { + try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192); + final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) { + String oneLine; - while (null != (oneLine = br.readLine())) { + while (null != (oneLine = demarcator.nextLine())) { final String updatedValue = replacementValue.concat(oneLine); bw.write(updatedValue); } @@ -461,10 +461,11 @@ public class ReplaceText extends AbstractProcessor { flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(final InputStream in, final OutputStream out) throws IOException { - try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize); - BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) { + try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192); + final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) { + String oneLine; - while (null != (oneLine = br.readLine())) { + while (null != (oneLine = demarcator.nextLine())) { // we need to find the first carriage return or new-line so that we can append the new value // before the line separate. However, we don't want to do this using a regular expression due // to performance concerns. So we will find the first occurrence of either \r or \n and use @@ -582,14 +583,15 @@ public class ReplaceText extends AbstractProcessor { updatedFlowFile = session.write(flowFile, new StreamCallback() { @Override public void process(final InputStream in, final OutputStream out) throws IOException { - try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize); - BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) { + try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192); + final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) { + String oneLine; final StringBuffer sb = new StringBuffer(); Matcher matcher = null; - while (null != (oneLine = br.readLine())) { + while (null != (oneLine = demarcator.nextLine())) { additionalAttrs.clear(); if (matcher == null) { matcher = searchPattern.matcher(oneLine); @@ -649,14 +651,7 @@ public class ReplaceText extends AbstractProcessor { public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) { final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); - - final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() { - @Override - public String decorate(final String attributeValue) { - return Pattern.quote(attributeValue); - } - }; - + final AttributeValueDecorator quotedAttributeDecorator = Pattern::quote; final String searchValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue(); final int flowFileSize = (int) flowFile.getSize(); @@ -672,16 +667,34 @@ public class ReplaceText extends AbstractProcessor { } }); } else { + final int initialBufferSize = (int) Math.min(flowFile.getSize(), 8192); + final Pattern searchPattern = Pattern.compile(searchValue, Pattern.LITERAL); + flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(final InputStream in, final OutputStream out) throws IOException { - try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize); - BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) { + try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, initialBufferSize); + final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) { + String oneLine; - while (null != (oneLine = br.readLine())) { - // Interpreting the search and replacement values as char sequences - final String updatedValue = oneLine.replace(searchValue, replacementValue); - bw.write(updatedValue); + while (null != (oneLine = demarcator.nextLine())) { + int matches = 0; + int lastEnd = 0; + + final Matcher matcher = searchPattern.matcher(oneLine); + while (matcher.find()) { + bw.write(oneLine, lastEnd, matcher.start() - lastEnd); + bw.write(replacementValue); + matches++; + + lastEnd = matcher.end(); + } + + if (matches > 0) { + bw.write(oneLine, lastEnd, oneLine.length() - lastEnd); + } else { + bw.write(oneLine); + } } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java index a87434ef7c..4fbbce6a6a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java @@ -19,24 +19,6 @@ package org.apache.nifi.processors.standard; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.Reader; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.DynamicRelationship; @@ -68,8 +50,24 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.util.NLKBufferedReader; +import org.apache.nifi.stream.io.util.LineDemarcator; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; @EventDriven @SideEffectFree @@ -291,7 +289,7 @@ public class RouteText extends AbstractProcessor { final Set allDynamicProps = this.dynamicPropertyNames; final Set newRelationships = new HashSet<>(); final String routeStrategy = configuredRouteStrategy; - if (ROUTE_TO_MATCHING_PROPERTY_NAME.equals(routeStrategy)) { + if (ROUTE_TO_MATCHING_PROPERTY_NAME.getValue().equals(routeStrategy)) { for (final String propName : allDynamicProps) { newRelationships.add(new Relationship.Builder().name(propName).build()); } @@ -419,14 +417,13 @@ public class RouteText extends AbstractProcessor { session.read(originalFlowFile, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { - try (final Reader inReader = new InputStreamReader(in, charset); - final NLKBufferedReader reader = new NLKBufferedReader(inReader)) { + try (final LineDemarcator demarcator = new LineDemarcator(in, charset, Integer.MAX_VALUE, 8192)) { final Map variables = new HashMap<>(2); int lineCount = 0; String line; - while ((line = reader.readLine()) != null) { + while ((line = demarcator.nextLine()) != null) { final String matchLine; if (trim) { @@ -550,11 +547,7 @@ public class RouteText extends AbstractProcessor { private void appendLine(final ProcessSession session, final Map> flowFileMap, final Relationship relationship, final FlowFile original, final String line, final Charset charset, final Group group) { - Map groupToFlowFileMap = flowFileMap.get(relationship); - if (groupToFlowFileMap == null) { - groupToFlowFileMap = new HashMap<>(); - flowFileMap.put(relationship, groupToFlowFileMap); - } + final Map groupToFlowFileMap = flowFileMap.computeIfAbsent(relationship, k -> new HashMap<>()); FlowFile flowFile = groupToFlowFileMap.get(group); if (flowFile == null) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/NLKBufferedReader.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/NLKBufferedReader.java deleted file mode 100644 index df8847f85a..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/NLKBufferedReader.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard.util; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.Reader; - -//NLKBufferedReader = New Line Keeper Buffered Reader -public class NLKBufferedReader extends BufferedReader { - public NLKBufferedReader(Reader in, int sz) { - super(in, sz); - } - - public NLKBufferedReader(Reader in) { - super(in); - } - - /** - * Reads a line of text in the same manner as {@link BufferedReader} except that any line-termination characters (\r and \n) are preserved in the String - * that is returned from this reader, whereas {@link BufferedReader} will strip those out. - * - * @return A String containing the next line of text (including any line-termination characters) from the underlying Reader, or null if no more data is available - * - * @throws IOException If unable to read from teh underlying Reader - */ - @Override - public String readLine() throws IOException { - final StringBuilder stringBuilder = new StringBuilder(); - - int intchar = read(); - while (intchar != -1) { - final char c = (char) intchar; - stringBuilder.append(c); - - if (c == '\n') { - break; - } else if (c == '\r') { - // Peek at next character, check if it's \n - int charPeek = peek(); - if (charPeek == '\n') { - stringBuilder.append((char) read()); - } - - break; - } - - intchar = read(); - } - - final String result = stringBuilder.toString(); - return (result.length() == 0) ? null : result; - } - - public int peek() throws IOException { - mark(1); - int readByte = read(); - reset(); - - return readByte; - } -}