NIFI-807: Cache the buffers that will be used to write data when reading from FlowFiles

This commit is contained in:
Mark Payne 2015-08-04 14:37:54 -04:00
parent 15d49025f2
commit e59ee5dda1
1 changed files with 44 additions and 17 deletions

View File

@ -26,13 +26,22 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
@ -40,15 +49,9 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.stream.io.StreamUtils;
@EventDriven @EventDriven
@SideEffectFree @SideEffectFree
@ -186,6 +189,7 @@ public class ExtractText extends AbstractProcessor {
private Set<Relationship> relationships; private Set<Relationship> relationships;
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private final BlockingQueue<byte[]> bufferQueue = new LinkedBlockingQueue<>();
private final AtomicReference<Map<String, Pattern>> compiledPattersMapRef = new AtomicReference<>(); private final AtomicReference<Map<String, Pattern>> compiledPattersMapRef = new AtomicReference<>();
@Override @Override
@ -245,6 +249,17 @@ public class ExtractText extends AbstractProcessor {
compiledPatternsMap.put(entry.getKey().getName(), pattern); compiledPatternsMap.put(entry.getKey().getName(), pattern);
} }
compiledPattersMapRef.set(compiledPatternsMap); compiledPattersMapRef.set(compiledPatternsMap);
for (int i = 0; i < context.getMaxConcurrentTasks(); i++) {
final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final byte[] buffer = new byte[maxBufferSize];
bufferQueue.add(buffer);
}
}
@OnStopped
public void onStopped() {
bufferQueue.clear();
} }
@Override @Override
@ -256,17 +271,29 @@ public class ExtractText extends AbstractProcessor {
final ProcessorLog logger = getLogger(); final ProcessorLog logger = getLogger();
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final int maxCaptureGroupLength = context.getProperty(MAX_CAPTURE_GROUP_LENGTH).asInteger(); final int maxCaptureGroupLength = context.getProperty(MAX_CAPTURE_GROUP_LENGTH).asInteger();
final long maxBufferSizeL = Math.min(flowFile.getSize(), context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue());
final byte[] buffer = new byte[(int) maxBufferSizeL];
final String contentString;
byte[] buffer = bufferQueue.poll();
if (buffer == null) {
final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
buffer = new byte[maxBufferSize];
}
try {
final byte[] byteBuffer = buffer;
session.read(flowFile, new InputStreamCallback() { session.read(flowFile, new InputStreamCallback() {
@Override @Override
public void process(InputStream in) throws IOException { public void process(InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer, false); StreamUtils.fillBuffer(in, byteBuffer, false);
} }
}); });
final String contentString = new String(buffer, 0, (int) maxBufferSizeL, charset); final long len = Math.min(byteBuffer.length, flowFile.getSize());
contentString = new String(byteBuffer, 0, (int) len, charset);
} finally {
bufferQueue.offer(buffer);
}
final Map<String, String> regexResults = new HashMap<>(); final Map<String, String> regexResults = new HashMap<>();
final Map<String, Pattern> patternMap = compiledPattersMapRef.get(); final Map<String, Pattern> patternMap = compiledPattersMapRef.get();