NIFI-11209: Include newly-added columns in output for UpdateTable processors when Update Field Names is true

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #6986.
This commit is contained in:
Matthew Burgess 2023-02-23 15:56:18 -05:00 committed by Pierre Villard
parent c8b5c0ce7b
commit 61b87e007b
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
4 changed files with 91 additions and 0 deletions

View File

@ -646,6 +646,7 @@ public class UpdateHive3Table extends AbstractProcessor {
if (!hiveColumns.contains(recordFieldName) && !partitionColumns.contains(recordFieldName)) {
// The field does not exist in the table (and is not a partition column), add it
columnsToAdd.add("`" + recordFieldName + "` " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
hiveColumns.add(recordFieldName);
getLogger().info("Adding column " + recordFieldName + " to table " + tableName);
}
}

View File

@ -493,6 +493,46 @@ public class TestUpdateHive3Table {
assertTrue(flowFile.getContent().startsWith("name,favorite_number,favorite_color,scale\n"));
}
@Test
public void testAddColumnUpdateFields() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "messages");
final MockHiveConnectionPool service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHive3Table.PARTITION_CLAUSE, "continent, country");
RecordSetWriterFactory recordWriter = new CSVRecordSetWriter();
runner.addControllerService("writer", recordWriter);
runner.enableControllerService(recordWriter);
runner.setProperty(UpdateHive3Table.UPDATE_FIELD_NAMES, "true");
runner.setProperty(UpdateHive3Table.RECORD_WRITER_FACTORY, "writer");
runner.enqueue(new byte[0]);
runner.run();
HashMap<String,String> attrs = new HashMap<>();
attrs.put("continent", "Asia");
attrs.put("country", "China");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE, "messages");
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/messages/continent=Asia/country=China");
List<String> statements = service.getExecutedStatements();
assertEquals(2, statements.size());
// All columns from users table/data should be added to the table, and a new partition should be added
assertEquals("ALTER TABLE `messages` ADD COLUMNS (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE)",
statements.get(0));
assertEquals("ALTER TABLE `messages` ADD IF NOT EXISTS PARTITION (`continent`='Asia', `country`='China')",
statements.get(1));
// The input reader is for a different table, so none of the columns match. This results in an empty output FlowFile
flowFile.assertContentEquals("");
}
private static final class MockUpdateHive3Table extends UpdateHive3Table {
}

View File

@ -520,6 +520,7 @@ public class UpdateDatabaseTable extends AbstractProcessor {
ColumnDescription columnToAdd = new ColumnDescription(recordFieldName, DataTypeUtils.getSQLTypeValue(recordField.getDataType()),
recordField.getDefaultValue() != null, null, recordField.isNullable());
columnsToAdd.add(columnToAdd);
dbColumns.add(recordFieldName);
getLogger().debug("Adding column " + recordFieldName + " to table " + tableName);
}
}

View File

@ -21,6 +21,7 @@ import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.db.impl.DerbyDatabaseAdapter;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
@ -359,6 +360,54 @@ public class TestUpdateDatabaseTable {
}
}
@Test
public void testAddColumnToExistingTableUpdateFieldNames() throws Exception {
runner = TestRunners.newTestRunner(processor);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate(createPersons);
}
MockRecordParser readerFactory = new MockRecordParser();
readerFactory.addSchemaField(new RecordField("id", RecordFieldType.INT.getDataType(), false));
readerFactory.addSchemaField(new RecordField("name", RecordFieldType.STRING.getDataType(), true));
readerFactory.addSchemaField(new RecordField("code", RecordFieldType.INT.getDataType(), 0, true));
readerFactory.addSchemaField(new RecordField("newField", RecordFieldType.STRING.getDataType(), 0, true));
readerFactory.addRecord(1, "name1", null, "test");
runner.addControllerService("mock-reader-factory", readerFactory);
runner.enableControllerService(readerFactory);
runner.setProperty(UpdateDatabaseTable.RECORD_READER, "mock-reader-factory");
runner.setProperty(UpdateDatabaseTable.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateDatabaseTable.CREATE_TABLE, UpdateDatabaseTable.FAIL_IF_NOT_EXISTS);
runner.setProperty(UpdateDatabaseTable.QUOTE_TABLE_IDENTIFIER, "true");
runner.setProperty(UpdateDatabaseTable.QUOTE_COLUMN_IDENTIFIERS, "false");
runner.setProperty(UpdateDatabaseTable.UPDATE_FIELD_NAMES, "true");
MockRecordWriter writerFactory = new MockRecordWriter();
runner.addControllerService("mock-writer-factory", writerFactory);
runner.enableControllerService(writerFactory);
runner.setProperty(UpdateDatabaseTable.RECORD_WRITER_FACTORY, "mock-writer-factory");
runner.setProperty(UpdateDatabaseTable.DB_TYPE, new DerbyDatabaseAdapter().getName());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateDatabaseTable.DBCP_SERVICE, "dbcp");
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "persons");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateDatabaseTable.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateDatabaseTable.REL_SUCCESS).get(0);
// Ensure the additional field is written out to the FlowFile
flowFile.assertContentEquals("\"1\",\"name1\",\"0\",\"test\"\n");
}
}
/**
* Simple implementation only for testing purposes
*/