NIFI-4572: Include database and table names in CaptureChangeMySQL events even when cache is not configured

This closes #6786.

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
Matthew Burgess 2022-12-15 10:04:34 -05:00 committed by Tamas Palfy
parent caee606706
commit 2a88980024
2 changed files with 10 additions and 2 deletions

View File

@ -282,8 +282,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
public static final PropertyDescriptor DIST_CACHE_CLIENT = new PropertyDescriptor.Builder()
.name("capture-change-mysql-dist-map-cache-client")
.displayName("Distributed Map Cache Client")
.description("Identifies a Distributed Map Cache Client controller service to be used for keeping information about the various tables, columns, etc. "
+ "needed by the processor. If a client is not specified, the generated events will not include column type or name information.")
.description("Identifies a Distributed Map Cache Client controller service to be used for keeping information about the various table columns, datatypes, etc. "
+ "needed by the processor. If a client is not specified, the generated events will not include column type or name information (but they will include database "
+ "and table information.")
.identifiesControllerService(DistributedMapCacheClient.class)
.required(false)
.build();
@ -918,6 +919,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
throw new IOException(se.getMessage(), se);
}
}
} else {
// Populate a limited version of TableInfo without column information
currentTable = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), Collections.emptyList());
}
} else {
// Clear the current table, to force a reload next time we get a TABLE_MAP event we care about

View File

@ -262,6 +262,10 @@ class CaptureChangeMySQLTest {
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
assertEquals(1, resultFiles.size())
assertEquals('10', resultFiles[0].getAttribute(EventWriter.SEQUENCE_ID_KEY))
// Verify the contents of the event includes the database and table name even though the cache is not configured
def json = new JsonSlurper().parseText(resultFiles[0].getContent())
assertEquals('myDB', json['database'])
assertEquals('myTable', json['table_name'])
}
@Test