NIFI-10576 Added onFinishRecordSet implementation for WriteParquetResult

This closes #6517

Co-authored-by: David Handermann <exceptionfactory@apache.org>
Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nathan Gough 2022-10-12 13:33:38 -04:00 committed by exceptionfactory
parent ac8e3dae58
commit 2fa82179a8
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 37 additions and 10 deletions

View File

@ -107,7 +107,7 @@ public class ParquetRecordSetWriter extends SchemaRegistryRecordSetWriter implem
throw new SchemaNotFoundException("Failed to compile Avro Schema", e); throw new SchemaNotFoundException("Failed to compile Avro Schema", e);
} }
return new WriteParquetResult(avroSchema, out, parquetConfig, logger); return new WriteParquetResult(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema, variables), out, parquetConfig, logger);
} catch (final SchemaNotFoundException e) { } catch (final SchemaNotFoundException e) {
throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e); throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e);

View File

@ -23,8 +23,10 @@ import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.parquet.stream.NifiParquetOutputFile; import org.apache.nifi.parquet.stream.NifiParquetOutputFile;
import org.apache.nifi.parquet.utils.ParquetConfig; import org.apache.nifi.parquet.utils.ParquetConfig;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.io.OutputFile; import org.apache.parquet.io.OutputFile;
@ -41,17 +43,21 @@ public class WriteParquetResult extends AbstractRecordSetWriter {
private final Schema schema; private final Schema schema;
private final ParquetWriter<GenericRecord> parquetWriter; private final ParquetWriter<GenericRecord> parquetWriter;
private final ComponentLog componentLogger; private final ComponentLog componentLogger;
private SchemaAccessWriter accessWriter;
private RecordSchema recordSchema;
public WriteParquetResult(final Schema schema, final OutputStream out, final ParquetConfig parquetConfig, final ComponentLog componentLogger) throws IOException { public WriteParquetResult(final Schema avroSchema, final RecordSchema recordSchema, final SchemaAccessWriter accessWriter, final OutputStream out,
final ParquetConfig parquetConfig, final ComponentLog componentLogger) throws IOException {
super(out); super(out);
this.schema = schema; this.schema = avroSchema;
this.componentLogger = componentLogger; this.componentLogger = componentLogger;
this.accessWriter = accessWriter;
this.recordSchema = recordSchema;
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
final OutputFile outputFile = new NifiParquetOutputFile(out); final OutputFile outputFile = new NifiParquetOutputFile(out);
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = final AvroParquetWriter.Builder<GenericRecord> writerBuilder = AvroParquetWriter.<GenericRecord>builder(outputFile).withSchema(avroSchema);
AvroParquetWriter.<GenericRecord>builder(outputFile).withSchema(schema);
applyCommonConfig(writerBuilder, conf, parquetConfig); applyCommonConfig(writerBuilder, conf, parquetConfig);
parquetWriter = writerBuilder.build(); parquetWriter = writerBuilder.build();
} }
@ -63,6 +69,11 @@ public class WriteParquetResult extends AbstractRecordSetWriter {
return Collections.emptyMap(); return Collections.emptyMap();
} }
@Override
protected Map<String, String> onFinishRecordSet() {
return accessWriter.getAttributes(recordSchema);
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
try { try {

View File

@ -31,6 +31,7 @@ import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
@ -55,6 +56,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class TestParquetRecordSetWriter { public class TestParquetRecordSetWriter {
@ -73,16 +75,17 @@ public class TestParquetRecordSetWriter {
@Test @Test
public void testWriteUsers() throws IOException, SchemaNotFoundException, InitializationException { public void testWriteUsers() throws IOException, SchemaNotFoundException, InitializationException {
initRecordSetWriter(); initRecordSetWriter(true);
final RecordSchema writeSchema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null); final RecordSchema writeSchema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null);
final File parquetFile = new File("target/testWriterUsers-" + System.currentTimeMillis()); final File parquetFile = new File("target/testWriterUsers-" + System.currentTimeMillis());
writeUsers(writeSchema, parquetFile); final WriteResult writeResult = writeUsers(writeSchema, parquetFile);
assertWriteAttributesFound(writeResult);
verifyParquetRecords(parquetFile); verifyParquetRecords(parquetFile);
} }
@Test @Test
public void testWriteUsersWhenSchemaFormatNotAvro() throws IOException, SchemaNotFoundException, InitializationException { public void testWriteUsersWhenSchemaFormatNotAvro() throws IOException, SchemaNotFoundException, InitializationException {
initRecordSetWriter(); initRecordSetWriter(false);
final RecordSchema writeSchema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null); final RecordSchema writeSchema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null);
final RecordSchema writeSchemaWithOtherFormat = new SimpleRecordSchema(writeSchema.getFields(), null, "OTHER-FORMAT", SchemaIdentifier.EMPTY); final RecordSchema writeSchemaWithOtherFormat = new SimpleRecordSchema(writeSchema.getFields(), null, "OTHER-FORMAT", SchemaIdentifier.EMPTY);
final File parquetFile = new File("target/testWriterUsers-" + System.currentTimeMillis()); final File parquetFile = new File("target/testWriterUsers-" + System.currentTimeMillis());
@ -90,7 +93,7 @@ public class TestParquetRecordSetWriter {
verifyParquetRecords(parquetFile); verifyParquetRecords(parquetFile);
} }
private void initRecordSetWriter() throws IOException, InitializationException { private void initRecordSetWriter(final boolean writeSchemaNameStrategy) throws IOException, InitializationException {
final TestRunner runner = TestRunners.newTestRunner(new AbstractProcessor() { final TestRunner runner = TestRunners.newTestRunner(new AbstractProcessor() {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
@ -103,12 +106,18 @@ public class TestParquetRecordSetWriter {
final Map<PropertyDescriptor, String> properties = createPropertiesWithSchema(schemaFile); final Map<PropertyDescriptor, String> properties = createPropertiesWithSchema(schemaFile);
properties.forEach((k, v) -> runner.setProperty(recordSetWriterFactory, k, v)); properties.forEach((k, v) -> runner.setProperty(recordSetWriterFactory, k, v));
if (writeSchemaNameStrategy) {
runner.setProperty(recordSetWriterFactory, "Schema Write Strategy", "schema-name");
}
runner.enableControllerService(recordSetWriterFactory); runner.enableControllerService(recordSetWriterFactory);
} }
private void writeUsers(final RecordSchema writeSchema, final File parquetFile) throws IOException { private WriteResult writeUsers(final RecordSchema writeSchema, final File parquetFile) throws IOException {
final WriteResult writeResult;
try(final OutputStream output = new FileOutputStream(parquetFile); try(final OutputStream output = new FileOutputStream(parquetFile);
final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(componentLog, writeSchema, output, Collections.emptyMap())) { final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(componentLog, writeSchema, output, Collections.emptyMap())) {
recordSetWriter.beginRecordSet();
for (int i = 0; i < USERS; i++) { for (int i = 0; i < USERS; i++) {
final Map<String, Object> userFields = new HashMap<>(); final Map<String, Object> userFields = new HashMap<>();
userFields.put("name", "user" + i); userFields.put("name", "user" + i);
@ -120,7 +129,9 @@ public class TestParquetRecordSetWriter {
} }
recordSetWriter.flush(); recordSetWriter.flush();
writeResult = recordSetWriter.finishRecordSet();
} }
return writeResult;
} }
private void verifyParquetRecords(final File parquetFile) throws IOException { private void verifyParquetRecords(final File parquetFile) throws IOException {
@ -149,4 +160,9 @@ public class TestParquetRecordSetWriter {
propertyValues.put(SchemaAccessUtils.SCHEMA_TEXT, schemaText); propertyValues.put(SchemaAccessUtils.SCHEMA_TEXT, schemaText);
return propertyValues; return propertyValues;
} }
private void assertWriteAttributesFound(final WriteResult writeResult) {
final Map<String, String> attributes = writeResult.getAttributes();
assertFalse(attributes.isEmpty(), "Write Attributes not found");
}
} }