NIFI-5718: Implemented LineDemarcator and removed NLKBufferedReader in order to improve performance

This commit is contained in:
Mark Payne 2018-10-18 12:05:16 -04:00 committed by Peter Wicks
parent 765df67817
commit 564ad0cd71
No known key found for this signature in database
GPG Key ID: 79ABE9BA9C7AB3CD
7 changed files with 549 additions and 133 deletions

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
public class RepeatingInputStream extends InputStream {
private final byte[] toRepeat;
private final int maxIterations;
private InputStream bais;
private int repeatCount;
public RepeatingInputStream(final byte[] toRepeat, final int iterations) {
if (iterations < 1) {
throw new IllegalArgumentException();
}
if (Objects.requireNonNull(toRepeat).length == 0) {
throw new IllegalArgumentException();
}
this.toRepeat = toRepeat;
this.maxIterations = iterations;
repeat();
bais = new ByteArrayInputStream(toRepeat);
repeatCount = 1;
}
@Override
public int read() throws IOException {
final int value = bais.read();
if (value > -1) {
return value;
}
final boolean repeated = repeat();
if (repeated) {
return bais.read();
}
return -1;
}
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
final int value = bais.read(b, off, len);
if (value > -1) {
return value;
}
final boolean repeated = repeat();
if (repeated) {
return bais.read(b, off, len);
}
return -1;
}
@Override
public int read(final byte[] b) throws IOException {
final int value = bais.read(b);
if (value > -1) {
return value;
}
final boolean repeated = repeat();
if (repeated) {
return bais.read(b);
}
return -1;
}
private boolean repeat() {
if (repeatCount >= maxIterations) {
return false;
}
repeatCount++;
bais = new ByteArrayInputStream(toRepeat);
return true;
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io.util;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.nio.BufferOverflowException;
public abstract class AbstractTextDemarcator implements Closeable {
private static final int INIT_BUFFER_SIZE = 8192;
private final Reader reader;
/*
* The maximum allowed size of the token. In the event such size is exceeded
* TokenTooLargeException is thrown.
*/
private final int maxDataSize;
/*
* Buffer into which the bytes are read from the provided stream. The size
* of the buffer is defined by the 'initialBufferSize' provided in the
* constructor or defaults to the value of INIT_BUFFER_SIZE constant.
*/
char[] buffer;
/*
* Starting offset of the demarcated token within the current 'buffer'.
*/
int index;
/*
* Starting offset of the demarcated token within the current 'buffer'. Keep
* in mind that while most of the time it is the same as the 'index' it may
* also have a value of 0 at which point it serves as a signal to the fill()
* operation that buffer needs to be expended if end of token is not reached
* (see fill() operation for more details).
*/
int mark;
/*
* The length of the bytes valid for reading. It is different from the
* buffer length, since this number may be smaller (e.g., at he end of the
* stream) then actual buffer length. It is set by the fill() operation
* every time more bytes read into buffer.
*/
int availableBytesLength;
/**
* Constructs an instance of demarcator with provided {@link InputStream}
* and max buffer size. Each demarcated token must fit within max buffer
* size, otherwise the exception will be raised.
*/
AbstractTextDemarcator(Reader reader, int maxDataSize) {
this(reader, maxDataSize, INIT_BUFFER_SIZE);
}
/**
* Constructs an instance of demarcator with provided {@link InputStream}
* and max buffer size and initial buffer size. Each demarcated token must
* fit within max buffer size, otherwise the exception will be raised.
*/
AbstractTextDemarcator(Reader reader, int maxDataSize, int initialBufferSize) {
this.validate(reader, maxDataSize, initialBufferSize);
this.reader = reader;
this.buffer = new char[initialBufferSize];
this.maxDataSize = maxDataSize;
}
@Override
public void close() throws IOException {
reader.close();
}
/**
* Will fill the current buffer from current 'index' position, expanding it
* and or shuffling it if necessary. If buffer exceeds max buffer size a
* {@link TokenTooLargeException} will be thrown.
*
* @throws IOException
* if unable to read from the stream
*/
void fill() throws IOException {
if (this.index >= this.buffer.length) {
if (this.mark == 0) { // expand
long expandedSize = Math.min(this.buffer.length * 2, this.buffer.length + 1_048_576);
if (expandedSize > maxDataSize) {
throw new BufferOverflowException();
}
char[] newBuff = new char[(int) expandedSize];
System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length);
this.buffer = newBuff;
} else { // shift the data left in the buffer
int length = this.index - this.mark;
System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
this.index = length;
this.mark = 0;
}
}
int bytesRead;
/*
* The do/while pattern is used here similar to the way it is used in
* BufferedReader essentially protecting from assuming the EOS until it
* actually is since not every implementation of InputStream guarantees
* that bytes are always available while the stream is open.
*/
do {
bytesRead = reader.read(this.buffer, this.index, this.buffer.length - this.index);
} while (bytesRead == 0);
this.availableBytesLength = bytesRead != -1 ? this.index + bytesRead : -1;
}
/**
* Validates prerequisites for constructor arguments
*/
private void validate(Reader reader, int maxDataSize, int initialBufferSize) {
if (reader == null) {
throw new IllegalArgumentException("'reader' must not be null");
} else if (maxDataSize <= 0) {
throw new IllegalArgumentException("'maxDataSize' must be > 0");
} else if (initialBufferSize <= 0) {
throw new IllegalArgumentException("'initialBufferSize' must be > 0");
}
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.Charset;
/**
* A demarcator that scans an InputStream for line endings (carriage returns and new lines) and returns
* lines of text one-at-a-time. This is similar to BufferedReader but with a very important distinction: while
* BufferedReader returns the lines of text after stripping off any line endings, this class returns text including the
* line endings. So, for example, if the following text is provided:
*
* <code>ABC\rXYZ\nABCXYZ\r\nhello</code>
*
* Then calls to {@link #nextLine()} will result in 4 String values being returned:
*
* <ul>
* <li><code>ABC\r</code></li>
* <li><code>XYZ\n</code></li>
* <li><code>ABCXYZ\r\n</code></li>
* <li><code>hello</code></li>
* </ul>
*
* All subsequent calls to {@link #nextLine()} will return <code>null</code>.
*/
public class LineDemarcator extends AbstractTextDemarcator {
private static final char CARRIAGE_RETURN = '\r';
private static final char NEW_LINE = '\n';
private char lastChar;
public LineDemarcator(final InputStream in, final Charset charset, final int maxDataSize, final int initialBufferSize) {
this(new InputStreamReader(in, charset), maxDataSize, initialBufferSize);
}
public LineDemarcator(final Reader reader, final int maxDataSize, final int initialBufferSize) {
super(reader, maxDataSize, initialBufferSize);
}
/**
* Will read the next line of text from the {@link InputStream} returning null
* when it reaches the end of the stream.
*
* @throws IOException if unable to read from the stream
*/
public String nextLine() throws IOException {
while (this.availableBytesLength != -1) {
if (this.index >= this.availableBytesLength) {
this.fill();
}
if (this.availableBytesLength != -1) {
char charVal;
int i;
for (i = this.index; i < this.availableBytesLength; i++) {
charVal = this.buffer[i];
try {
if (charVal == NEW_LINE) {
this.index = i + 1;
final int size = this.index - this.mark;
final String line = new String(this.buffer, mark, size);
this.mark = this.index;
return line;
} else if (lastChar == CARRIAGE_RETURN) {
// Point this.index to i+1 because that's the next byte that we want to consume.
this.index = i + 1;
// Size is equal to where the line began, up to index-1 because we don't want to consume the last byte encountered.
final int size = this.index - 1 - this.mark;
final String line = new String(this.buffer, mark, size);
// set 'mark' to index - 1 because we don't want to consume the last byte that we've encountered, since we're basing our
// line on the previous byte.
this.mark = this.index - 1;
return line;
}
} finally {
lastChar = charVal;
}
}
this.index = i;
} else {
final int size = this.index - this.mark;
if (size == 0) {
return null;
}
return new String(this.buffer, mark, size);
}
}
return null;
}
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io.util;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class TestLineDemarcator {
@Test
public void testSingleCharacterLines() throws IOException {
final String input = "A\nB\nC\rD\r\nE\r\nF\r\rG";
final List<String> lines = getLines(input);
assertEquals(Arrays.asList("A\n", "B\n", "C\r", "D\r\n", "E\r\n", "F\r", "\r", "G"), lines);
}
@Test
public void testEmptyStream() throws IOException {
final List<String> lines = getLines("");
assertEquals(Collections.emptyList(), lines);
}
@Test
public void testOnlyEmptyLines() throws IOException {
final String input = "\r\r\r\n\n\n\r\n";
final List<String> lines = getLines(input);
assertEquals(Arrays.asList("\r", "\r", "\r\n", "\n", "\n", "\r\n"), lines);
}
@Test
public void testOnBufferSplit() throws IOException {
final String input = "ABC\r\nXYZ";
final List<String> lines = getLines(input, 10, 4);
assertEquals(Arrays.asList("ABC\r\n", "XYZ"), lines);
}
@Test
public void testEndsWithCarriageReturn() throws IOException {
final List<String> lines = getLines("ABC\r");
assertEquals(Arrays.asList("ABC\r"), lines);
}
@Test
public void testEndsWithNewLine() throws IOException {
final List<String> lines = getLines("ABC\n");
assertEquals(Arrays.asList("ABC\n"), lines);
}
@Test
public void testEndsWithCarriageReturnNewLine() throws IOException {
final List<String> lines = getLines("ABC\r\n");
assertEquals(Arrays.asList("ABC\r\n"), lines);
}
@Test
public void testReadAheadInIsEol() throws IOException {
final String input = "he\ra-to-a\rb-to-b\rc-to-c\r\nd-to-d";
final List<String> lines = getLines(input, 10, 10);
assertEquals(Arrays.asList("he\r", "a-to-a\r", "b-to-b\r", "c-to-c\r\n", "d-to-d"), lines);
}
@Test
public void testFirstCharMatchOnly() throws IOException {
final List<String> lines = getLines("\nThe quick brown fox jumped over the lazy dog.");
assertEquals(Arrays.asList("\n", "The quick brown fox jumped over the lazy dog."), lines);
}
private List<String> getLines(final String text) throws IOException {
return getLines(text, 8192, 8192);
}
private List<String> getLines(final String text, final int maxDataSize, final int bufferSize) throws IOException {
final byte[] bytes = text.getBytes(StandardCharsets.UTF_8);
final List<String> lines = new ArrayList<>();
try (final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
final Reader reader = new InputStreamReader(bais, StandardCharsets.UTF_8);
final LineDemarcator demarcator = new LineDemarcator(reader, maxDataSize, bufferSize)) {
String line;
while ((line = demarcator.nextLine()) != null) {
lines.add(line);
}
}
return lines;
}
}

View File

@ -49,14 +49,13 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.NLKBufferedReader;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.LineDemarcator;
import org.apache.nifi.util.StopWatch;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
@ -342,7 +341,7 @@ public class ReplaceText extends AbstractProcessor {
final StringBuilder sb = new StringBuilder(value.length() + 1);
final int groupStart = backRefMatcher.start(1);
sb.append(value.substring(0, groupStart - 1));
sb.append(value, 0, groupStart - 1);
sb.append("\\");
sb.append(value.substring(groupStart - 1));
value = sb.toString();
@ -370,11 +369,11 @@ public class ReplaceText extends AbstractProcessor {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) {
try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192);
final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
String line;
while ((line = br.readLine()) != null) {
while ((line = demarcator.nextLine()) != null) {
// We need to determine what line ending was used and use that after our replacement value.
lineEndingBuilder.setLength(0);
for (int i = line.length() - 1; i >= 0; i--) {
@ -423,10 +422,11 @@ public class ReplaceText extends AbstractProcessor {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) {
try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192);
final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
String oneLine;
while (null != (oneLine = br.readLine())) {
while (null != (oneLine = demarcator.nextLine())) {
final String updatedValue = replacementValue.concat(oneLine);
bw.write(updatedValue);
}
@ -461,10 +461,11 @@ public class ReplaceText extends AbstractProcessor {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) {
try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192);
final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
String oneLine;
while (null != (oneLine = br.readLine())) {
while (null != (oneLine = demarcator.nextLine())) {
// we need to find the first carriage return or new-line so that we can append the new value
// before the line separate. However, we don't want to do this using a regular expression due
// to performance concerns. So we will find the first occurrence of either \r or \n and use
@ -582,14 +583,15 @@ public class ReplaceText extends AbstractProcessor {
updatedFlowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192);
final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
String oneLine;
final StringBuffer sb = new StringBuffer();
Matcher matcher = null;
while (null != (oneLine = br.readLine())) {
while (null != (oneLine = demarcator.nextLine())) {
additionalAttrs.clear();
if (matcher == null) {
matcher = searchPattern.matcher(oneLine);
@ -649,14 +651,7 @@ public class ReplaceText extends AbstractProcessor {
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() {
@Override
public String decorate(final String attributeValue) {
return Pattern.quote(attributeValue);
}
};
final AttributeValueDecorator quotedAttributeDecorator = Pattern::quote;
final String searchValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
final int flowFileSize = (int) flowFile.getSize();
@ -672,16 +667,34 @@ public class ReplaceText extends AbstractProcessor {
}
});
} else {
final int initialBufferSize = (int) Math.min(flowFile.getSize(), 8192);
final Pattern searchPattern = Pattern.compile(searchValue, Pattern.LITERAL);
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, initialBufferSize);
final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
String oneLine;
while (null != (oneLine = br.readLine())) {
// Interpreting the search and replacement values as char sequences
final String updatedValue = oneLine.replace(searchValue, replacementValue);
bw.write(updatedValue);
while (null != (oneLine = demarcator.nextLine())) {
int matches = 0;
int lastEnd = 0;
final Matcher matcher = searchPattern.matcher(oneLine);
while (matcher.find()) {
bw.write(oneLine, lastEnd, matcher.start() - lastEnd);
bw.write(replacementValue);
matches++;
lastEnd = matcher.end();
}
if (matches > 0) {
bw.write(oneLine, lastEnd, oneLine.length() - lastEnd);
} else {
bw.write(oneLine);
}
}
}
}

View File

@ -19,24 +19,6 @@ package org.apache.nifi.processors.standard;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
@ -68,8 +50,24 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.NLKBufferedReader;
import org.apache.nifi.stream.io.util.LineDemarcator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@EventDriven
@SideEffectFree
@ -291,7 +289,7 @@ public class RouteText extends AbstractProcessor {
final Set<String> allDynamicProps = this.dynamicPropertyNames;
final Set<Relationship> newRelationships = new HashSet<>();
final String routeStrategy = configuredRouteStrategy;
if (ROUTE_TO_MATCHING_PROPERTY_NAME.equals(routeStrategy)) {
if (ROUTE_TO_MATCHING_PROPERTY_NAME.getValue().equals(routeStrategy)) {
for (final String propName : allDynamicProps) {
newRelationships.add(new Relationship.Builder().name(propName).build());
}
@ -419,14 +417,13 @@ public class RouteText extends AbstractProcessor {
session.read(originalFlowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final Reader inReader = new InputStreamReader(in, charset);
final NLKBufferedReader reader = new NLKBufferedReader(inReader)) {
try (final LineDemarcator demarcator = new LineDemarcator(in, charset, Integer.MAX_VALUE, 8192)) {
final Map<String, String> variables = new HashMap<>(2);
int lineCount = 0;
String line;
while ((line = reader.readLine()) != null) {
while ((line = demarcator.nextLine()) != null) {
final String matchLine;
if (trim) {
@ -550,11 +547,7 @@ public class RouteText extends AbstractProcessor {
private void appendLine(final ProcessSession session, final Map<Relationship, Map<Group, FlowFile>> flowFileMap, final Relationship relationship,
final FlowFile original, final String line, final Charset charset, final Group group) {
Map<Group, FlowFile> groupToFlowFileMap = flowFileMap.get(relationship);
if (groupToFlowFileMap == null) {
groupToFlowFileMap = new HashMap<>();
flowFileMap.put(relationship, groupToFlowFileMap);
}
final Map<Group, FlowFile> groupToFlowFileMap = flowFileMap.computeIfAbsent(relationship, k -> new HashMap<>());
FlowFile flowFile = groupToFlowFileMap.get(group);
if (flowFile == null) {

View File

@ -1,76 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
//NLKBufferedReader = New Line Keeper Buffered Reader
public class NLKBufferedReader extends BufferedReader {
public NLKBufferedReader(Reader in, int sz) {
super(in, sz);
}
public NLKBufferedReader(Reader in) {
super(in);
}
/**
* Reads a line of text in the same manner as {@link BufferedReader} except that any line-termination characters (\r and \n) are preserved in the String
* that is returned from this reader, whereas {@link BufferedReader} will strip those out.
*
* @return A String containing the next line of text (including any line-termination characters) from the underlying Reader, or null if no more data is available
*
* @throws IOException If unable to read from teh underlying Reader
*/
@Override
public String readLine() throws IOException {
final StringBuilder stringBuilder = new StringBuilder();
int intchar = read();
while (intchar != -1) {
final char c = (char) intchar;
stringBuilder.append(c);
if (c == '\n') {
break;
} else if (c == '\r') {
// Peek at next character, check if it's \n
int charPeek = peek();
if (charPeek == '\n') {
stringBuilder.append((char) read());
}
break;
}
intchar = read();
}
final String result = stringBuilder.toString();
return (result.length() == 0) ? null : result;
}
public int peek() throws IOException {
mark(1);
int readByte = read();
reset();
return readByte;
}
}