NIFI-10888: When inferring a schema using a Record Reader, buffer up to 1 MB of FlowFile content for the schema inference so that when we read the contents to obtain records we can use the buffered data. This helps in cases of small FlowFiles by not having to seek back to the beginning of the FlowFile every time.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #6725
This commit is contained in:
Mark Payne 2022-11-28 13:39:36 -05:00 committed by Matthew Burgess
parent ed6ba53724
commit 78be613a0f
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
2 changed files with 53 additions and 5 deletions

View File

@ -22,6 +22,7 @@ import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.controller.repository.metrics.PerformanceTrackingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
@ -40,6 +41,8 @@ public class ContentClaimInputStream extends InputStream {
private long bytesConsumed;
private long currentOffset; // offset into the Content Claim; will differ from bytesRead if reset() is called after reading at least one byte or if claimOffset > 0
private long markOffset;
private InputStream bufferedIn;
private int markReadLimit;
public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset, final PerformanceTracker performanceTracker) {
this(contentRepository, contentClaim, claimOffset, null, performanceTracker);
@ -54,9 +57,13 @@ public class ContentClaimInputStream extends InputStream {
this.currentOffset = claimOffset;
this.delegate = initialDelegate;
if (delegate != null) {
this.bufferedIn = new BufferedInputStream(delegate);
}
}
private InputStream getDelegate() throws IOException {
bufferedIn = null;
if (delegate == null) {
formDelegate();
}
@ -74,7 +81,14 @@ public class ContentClaimInputStream extends InputStream {
@Override
public int read() throws IOException {
final int value = getDelegate().read();
int value = -1;
if (bufferedIn != null) {
value = bufferedIn.read();
}
if (value < 0) {
value = getDelegate().read();
}
if (value != -1) {
bytesConsumed++;
currentOffset++;
@ -85,7 +99,14 @@ public class ContentClaimInputStream extends InputStream {
@Override
public int read(final byte[] b) throws IOException {
final int count = getDelegate().read(b);
int count = -1;
if (bufferedIn != null) {
count = bufferedIn.read(b);
}
if (count < 0) {
count = getDelegate().read(b);
}
if (count != -1) {
bytesConsumed += count;
currentOffset += count;
@ -96,7 +117,14 @@ public class ContentClaimInputStream extends InputStream {
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
final int count = getDelegate().read(b, off, len);
int count = -1;
if (bufferedIn != null) {
count = bufferedIn.read(b, off, len);
}
if (count < 0) {
count = getDelegate().read(b, off, len);
}
if (count != -1) {
bytesConsumed += count;
currentOffset += count;
@ -133,6 +161,10 @@ public class ContentClaimInputStream extends InputStream {
@Override
public void mark(final int readlimit) {
markOffset = currentOffset;
markReadLimit = readlimit;
if (bufferedIn != null) {
bufferedIn.mark(readlimit);
}
}
@Override
@ -141,6 +173,13 @@ public class ContentClaimInputStream extends InputStream {
throw new IOException("Stream has not been marked");
}
if (bufferedIn != null && bytesConsumed <= markReadLimit) {
bufferedIn.reset();
currentOffset = markOffset;
return;
}
if (currentOffset != markOffset) {
if (delegate != null) {
delegate.close();
@ -176,8 +215,17 @@ public class ContentClaimInputStream extends InputStream {
delegate = new PerformanceTrackingInputStream(contentRepository.read(contentClaim), performanceTracker);
StreamUtils.skip(delegate, claimOffset);
currentOffset = claimOffset;
if (markReadLimit > 0) {
final int limitLeft = (int) (markReadLimit - currentOffset);
if (limitLeft > 0) {
final InputStream limitedIn = new LimitedInputStream(delegate, limitLeft);
bufferedIn = new BufferedInputStream(limitedIn);
bufferedIn.mark(limitLeft);
}
}
} finally {
performanceTracker.endContentRead();;
performanceTracker.endContentRead();
}
}
}

View File

@ -43,7 +43,7 @@ public class InferSchemaAccessStrategy<T> implements SchemaAccessStrategy {
public RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws IOException {
// We expect to be able to mark/reset any length because we expect that the underlying stream here will be a ContentClaimInputStream, which is able to
// re-read the content regardless of how much data is read.
contentStream.mark(10_000_000);
contentStream.mark(1_000_000);
try {
final RecordSource<T> recordSource = recordSourceFactory.create(variables, new NonCloseableInputStream(contentStream));
final RecordSchema schema = schemaInference.inferSchema(recordSource);