mirror of https://github.com/apache/nifi.git
NIFI-2841 Refactoring logic in SplitAvro RecordSplitter to avoid making two calls in a row to reader.hasNext()
This closes #1088
This commit is contained in:
parent
d1d053725b
commit
78020825e9
|
@ -267,7 +267,8 @@ public class SplitAvro extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
// while records are left, start a new split by spawning a FlowFile
|
// while records are left, start a new split by spawning a FlowFile
|
||||||
while (reader.hasNext()) {
|
final AtomicReference<Boolean> hasNextHolder = new AtomicReference<Boolean>(reader.hasNext());
|
||||||
|
while (hasNextHolder.get()) {
|
||||||
FlowFile childFlowFile = session.create(originalFlowFile);
|
FlowFile childFlowFile = session.create(originalFlowFile);
|
||||||
childFlowFile = session.write(childFlowFile, new OutputStreamCallback() {
|
childFlowFile = session.write(childFlowFile, new OutputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -277,11 +278,13 @@ public class SplitAvro extends AbstractProcessor {
|
||||||
|
|
||||||
// append to the current FlowFile until no more records, or splitSize is reached
|
// append to the current FlowFile until no more records, or splitSize is reached
|
||||||
int recordCount = 0;
|
int recordCount = 0;
|
||||||
while (reader.hasNext() && recordCount < splitSize) {
|
while (hasNextHolder.get() && recordCount < splitSize) {
|
||||||
recordHolder.set(reader.next(recordHolder.get()));
|
recordHolder.set(reader.next(recordHolder.get()));
|
||||||
splitWriter.write(recordHolder.get());
|
splitWriter.write(recordHolder.get());
|
||||||
recordCount++;
|
recordCount++;
|
||||||
|
hasNextHolder.set(reader.hasNext());
|
||||||
}
|
}
|
||||||
|
|
||||||
splitWriter.flush();
|
splitWriter.flush();
|
||||||
} finally {
|
} finally {
|
||||||
splitWriter.close();
|
splitWriter.close();
|
||||||
|
|
Loading…
Reference in New Issue