NIFI-6955 close streams in Reader/writer mock

This commit is contained in:
KovalevIV 2019-12-19 22:26:31 +03:00
parent 4b08cf116c
commit 8f9fc49ca0
2 changed files with 15 additions and 7 deletions

View File

@ -21,6 +21,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
@ -37,7 +38,7 @@ public class ArrayListRecordReader extends AbstractControllerService implements
@Override
public ArrayListReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) {
return new ArrayListReader(records, schema);
return new ArrayListReader(records, schema, in);
}
public void addRecord(final Record record) {
@ -47,10 +48,12 @@ public class ArrayListRecordReader extends AbstractControllerService implements
public static class ArrayListReader implements RecordReader {
private final RecordSchema schema;
private final Iterator<Record> itr;
private final InputStream in;
public ArrayListReader(final List<Record> records, final RecordSchema schema) {
public ArrayListReader(final List<Record> records, final RecordSchema schema, InputStream in) {
this.itr = records.iterator();
this.schema = schema;
this.in = in;
}
@Override
@ -64,7 +67,8 @@ public class ArrayListRecordReader extends AbstractControllerService implements
}
@Override
public void close(){
public void close() throws IOException {
in.close();
}
}
}

View File

@ -49,7 +49,7 @@ public class ArrayListRecordWriter extends AbstractControllerService implements
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out, final Map<String, String> variables) {
return new ArrayListRecordSetWriter(records);
return new ArrayListRecordSetWriter(records, out);
}
public List<Record> getRecordsWritten() {
@ -58,9 +58,11 @@ public class ArrayListRecordWriter extends AbstractControllerService implements
public static class ArrayListRecordSetWriter implements RecordSetWriter {
private final List<Record> records;
private final OutputStream out;
public ArrayListRecordSetWriter(final List<Record> records) {
public ArrayListRecordSetWriter(final List<Record> records, OutputStream out) {
this.records = records;
this.out = out;
}
@Override
@ -97,11 +99,13 @@ public class ArrayListRecordWriter extends AbstractControllerService implements
}
@Override
public void flush() {
public void flush() throws IOException {
out.flush();
}
@Override
public void close() {
public void close() throws IOException {
out.close();
}
}
}