diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java index 6bf574f455..6ce91385e2 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java @@ -39,6 +39,11 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter { this.out.close(); } + @Override + public void flush() throws IOException { + out.flush(); + } + @Override public WriteResult write(final RecordSet recordSet) throws IOException { beginRecordSet(); diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java index 720953c8f8..6c21a391f8 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java @@ -37,4 +37,11 @@ public interface RecordWriter extends Closeable { * the mime.type attribute. */ String getMimeType(); + + /** + * Flushes any buffered data to the underlying storage mechanism + * + * @throws IOException if unable to write to the underlying storage mechanism + */ + void flush() throws IOException; } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java index 1d6aafe199..891bbe35a4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java @@ -65,6 +65,11 @@ public class MockRecordWriter extends AbstractControllerService implements Recor private int recordCount = 0; private boolean headerWritten = false; + @Override + public void flush() throws IOException { + out.flush(); + } + @Override public WriteResult write(final RecordSet rs) throws IOException { if (header != null && !headerWritten) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index 4b3a3ae085..66641dfac7 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -113,6 +113,7 @@ public class PublisherLease implements Closeable { recordCount++; baos.reset(); writer.write(record); + writer.flush(); final byte[] messageContent = baos.toByteArray(); final String key = messageKeyField == null ? null : record.getAsString(messageKeyField); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java index 27df57bc6e..15496263a7 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java @@ -60,6 +60,12 @@ public class MockRecordWriter extends AbstractControllerService implements Recor @Override public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) { return new RecordSetWriter() { + + @Override + public void flush() throws IOException { + out.flush(); + } + @Override public WriteResult write(final RecordSet rs) throws IOException { out.write(header.getBytes()); diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy index b0daaca18e..c9611710ae 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy @@ -93,6 +93,10 @@ class GroovyRecordSetWriter implements RecordSetWriter { @Override public void close() throws IOException { } + + @Override + public void flush() throws IOException { + } } class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index c00eb4b300..c6035f2e51 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -271,6 +271,12 @@ public class TestQueryRecord { @Override public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) { return new RecordSetWriter() { + + @Override + public void flush() throws IOException { + out.flush(); + } + @Override public WriteResult write(final RecordSet rs) throws IOException { final int colCount = rs.getSchema().getFieldCount(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java deleted file mode 100644 index 799d3eeeb5..0000000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.avro; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumWriter; -import org.apache.nifi.serialization.RecordSetWriter; -import org.apache.nifi.serialization.WriteResult; -import org.apache.nifi.serialization.record.Record; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Collections; - -public abstract class WriteAvroResult implements RecordSetWriter { - private final Schema schema; - private final OutputStream out; - - public WriteAvroResult(final Schema schema, final OutputStream out) { - this.schema = schema; - this.out = out; - } - - protected Schema getSchema() { - return schema; - } - - @Override - public WriteResult write(final Record record) throws IOException { - final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); - - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - try (final DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter)) { - dataFileWriter.create(schema, out); - dataFileWriter.append(rec); - } - - return WriteResult.of(1, Collections.emptyMap()); - } - - @Override - public String getMimeType() { - return "application/avro-binary"; - } -} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java index c1f000b418..25d494ea0c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java @@ -61,8 +61,7 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { @Override protected Map onFinishRecordSet() throws IOException { - encoder.flush(); - buffered.flush(); + flush(); return schemaAccessWriter.getAttributes(recordSchema); } @@ -73,6 +72,12 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { return schemaAccessWriter.getAttributes(recordSchema); } + @Override + public void flush() throws IOException { + encoder.flush(); + buffered.flush(); + } + @Override public String getMimeType() { return "application/avro-binary"; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java index dd151180f1..ae2f109232 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java @@ -48,6 +48,11 @@ public class WriteAvroResultWithSchema extends AbstractRecordSetWriter { dataFileWriter.close(); } + @Override + public void flush() throws IOException { + dataFileWriter.flush(); + } + @Override public Map writeRecord(final Record record) throws IOException { final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java index f8998f92e4..34a51babff 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java @@ -88,6 +88,11 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet printer.close(); } + @Override + public void flush() throws IOException { + printer.flush(); + } + @Override public Map writeRecord(final Record record) throws IOException { int i = 0; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java index a41412f0dc..8acaa04f5a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java @@ -95,6 +95,13 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe super.close(); } + @Override + public void flush() throws IOException { + if (generator != null) { + generator.flush(); + } + } + @Override public Map writeRecord(final Record record) throws IOException { writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject()); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java index 7012504ca8..f22f592902 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java @@ -17,6 +17,7 @@ package org.apache.nifi.text; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; @@ -37,13 +38,11 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor private static final byte NEW_LINE = (byte) '\n'; private final PropertyValue propertyValue; private final Charset charset; - private final OutputStream out; public FreeFormTextWriter(final PropertyValue textPropertyValue, final Charset characterSet, final OutputStream out) { - super(out); + super(new BufferedOutputStream(out)); this.propertyValue = textPropertyValue; this.charset = characterSet; - this.out = out; } private List getColumnNames(final RecordSchema schema) { @@ -60,7 +59,7 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor @Override public Map writeRecord(final Record record) throws IOException { - write(record, out, getColumnNames(record.getSchema())); + write(record, getOutputStream(), getColumnNames(record.getSchema())); return Collections.emptyMap(); }