diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index 08dad6dc40..978b8bc8d7 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -282,8 +282,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { public static final PropertyDescriptor DIST_CACHE_CLIENT = new PropertyDescriptor.Builder() .name("capture-change-mysql-dist-map-cache-client") .displayName("Distributed Map Cache Client") - .description("Identifies a Distributed Map Cache Client controller service to be used for keeping information about the various tables, columns, etc. " - + "needed by the processor. If a client is not specified, the generated events will not include column type or name information.") + .description("Identifies a Distributed Map Cache Client controller service to be used for keeping information about the various table columns, datatypes, etc. " + + "needed by the processor. If a client is not specified, the generated events will not include column type or name information (but they will include database " + + "and table information.") .identifiesControllerService(DistributedMapCacheClient.class) .required(false) .build(); @@ -918,6 +919,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { throw new IOException(se.getMessage(), se); } } + } else { + // Populate a limited version of TableInfo without column information + currentTable = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), Collections.emptyList()); } } else { // Clear the current table, to force a reload next time we get a TABLE_MAP event we care about diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index cc03e2aaa6..66828c3570 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -262,6 +262,10 @@ class CaptureChangeMySQLTest { def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS) assertEquals(1, resultFiles.size()) assertEquals('10', resultFiles[0].getAttribute(EventWriter.SEQUENCE_ID_KEY)) + // Verify the contents of the event includes the database and table name even though the cache is not configured + def json = new JsonSlurper().parseText(resultFiles[0].getContent()) + assertEquals('myDB', json['database']) + assertEquals('myTable', json['table_name']) } @Test