diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java index 5a386a628b..2862c342c9 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java @@ -26,13 +26,22 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -40,15 +49,9 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.stream.io.StreamUtils; @EventDriven @SideEffectFree @@ -186,6 +189,7 @@ public class ExtractText extends AbstractProcessor { private Set relationships; private List properties; + private final BlockingQueue bufferQueue = new LinkedBlockingQueue<>(); private final AtomicReference> compiledPattersMapRef = new AtomicReference<>(); @Override @@ -245,6 +249,17 @@ public class ExtractText extends AbstractProcessor { compiledPatternsMap.put(entry.getKey().getName(), pattern); } compiledPattersMapRef.set(compiledPatternsMap); + + for (int i = 0; i < context.getMaxConcurrentTasks(); i++) { + final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + final byte[] buffer = new byte[maxBufferSize]; + bufferQueue.add(buffer); + } + } + + @OnStopped + public void onStopped() { + bufferQueue.clear(); } @Override @@ -256,17 +271,29 @@ public class ExtractText extends AbstractProcessor { final ProcessorLog logger = getLogger(); final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); final int maxCaptureGroupLength = context.getProperty(MAX_CAPTURE_GROUP_LENGTH).asInteger(); - final long maxBufferSizeL = Math.min(flowFile.getSize(), context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue()); - final byte[] buffer = new byte[(int) maxBufferSizeL]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - StreamUtils.fillBuffer(in, buffer, false); - } - }); + final String contentString; + byte[] buffer = bufferQueue.poll(); + if (buffer == null) { + final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + buffer = new byte[maxBufferSize]; + } + + try { + final byte[] byteBuffer = buffer; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + StreamUtils.fillBuffer(in, byteBuffer, false); + } + }); + + final long len = Math.min(byteBuffer.length, flowFile.getSize()); + contentString = new String(byteBuffer, 0, (int) len, charset); + } finally { + bufferQueue.offer(buffer); + } - final String contentString = new String(buffer, 0, (int) maxBufferSizeL, charset); final Map regexResults = new HashMap<>(); final Map patternMap = compiledPattersMapRef.get();