NIFI-11636: Do not buffer Parquet content into memory unnecessarily

NIFI-11636: Change default log level from parquet internal reader to WARN as it logs excessively at INFO level
Signed-off-by: Matt Burgess <mattyb149@apache.org>
This commit is contained in:
Mark Payne 2023-06-02 13:21:15 -04:00 committed by Matt Burgess
parent d4e9bd5ce9
commit 5da77e8e74
3 changed files with 7 additions and 8 deletions

View File

@ -119,7 +119,7 @@
<logger name="org.apache.nifi.processors.standard.LogAttribute" level="INFO"/> <logger name="org.apache.nifi.processors.standard.LogAttribute" level="INFO"/>
<logger name="org.apache.nifi.processors.standard.LogMessage" level="INFO"/> <logger name="org.apache.nifi.processors.standard.LogMessage" level="INFO"/>
<logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" /> <logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />
<logger name="org.apache.parquet.hadoop.InternalParquetRecordReader" level="WARN" />
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" /> <logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
<logger name="org.apache.zookeeper.server.NIOServerCnxn" level="ERROR" /> <logger name="org.apache.zookeeper.server.NIOServerCnxn" level="ERROR" />

View File

@ -20,7 +20,6 @@ import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.parquet.io.InputFile; import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.io.SeekableInputStream;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
public class NifiParquetInputFile implements InputFile { public class NifiParquetInputFile implements InputFile {
@ -42,12 +41,12 @@ public class NifiParquetInputFile implements InputFile {
} }
@Override @Override
public long getLength() throws IOException { public long getLength() {
return length; return length;
} }
@Override @Override
public SeekableInputStream newStream() throws IOException { public SeekableInputStream newStream() {
return new NifiSeekableInputStream(input); return new NifiSeekableInputStream(input);
} }
} }

View File

@ -29,11 +29,11 @@ public class NifiSeekableInputStream extends DelegatingSeekableInputStream {
public NifiSeekableInputStream(final ByteCountingInputStream input) { public NifiSeekableInputStream(final ByteCountingInputStream input) {
super(input); super(input);
this.input = input; this.input = input;
this.input.mark(Integer.MAX_VALUE); this.input.mark(8192);
} }
@Override @Override
public long getPos() throws IOException { public long getPos() {
return input.getBytesConsumed(); return input.getBytesConsumed();
} }
@ -47,7 +47,7 @@ public class NifiSeekableInputStream extends DelegatingSeekableInputStream {
if (newPos < currentPos) { if (newPos < currentPos) {
// seeking backwards so first reset back to beginning of the stream then seek // seeking backwards so first reset back to beginning of the stream then seek
input.reset(); input.reset();
input.mark(Integer.MAX_VALUE); input.mark(8192);
} }
// must call getPos() again in case reset was called above // must call getPos() again in case reset was called above
@ -65,7 +65,7 @@ public class NifiSeekableInputStream extends DelegatingSeekableInputStream {
} }
@Override @Override
public synchronized void reset() throws IOException { public synchronized void reset() {
throw new UnsupportedOperationException("Mark/reset is not supported"); throw new UnsupportedOperationException("Mark/reset is not supported");
} }
} }