NIFI-8253: Restore call to session.commit() in GenerateTableFetch

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4840.
This commit is contained in:
Matthew Burgess 2021-02-23 15:03:36 -05:00 committed by Pierre Villard
parent f101a2bba5
commit 2dedd8bf0f
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 16 additions and 1 deletions

View File

@ -47,7 +47,7 @@ public class MockSessionFactory implements ProcessSessionFactory {
return session; return session;
} }
Set<MockProcessSession> getCreatedSessions() { public Set<MockProcessSession> getCreatedSessions() {
return Collections.unmodifiableSet(createdSessions); return Collections.unmodifiableSet(createdSessions);
} }
} }

View File

@ -554,6 +554,9 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
logger.error("{} failed to update State Manager, observed maximum values will not be recorded. " logger.error("{} failed to update State Manager, observed maximum values will not be recorded. "
+ "Also, any generated SQL statements may be duplicated.", this, ioe); + "Also, any generated SQL statements may be duplicated.", this, ioe);
} }
session.commit();
} catch (final ProcessException pe) { } catch (final ProcessException pe) {
// Log the cause of the ProcessException if it is available // Log the cause of the ProcessException if it is available
Throwable t = (pe.getCause() == null ? pe : pe.getCause()); Throwable t = (pe.getCause() == null ? pe : pe.getCause());

View File

@ -25,6 +25,8 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.db.impl.DerbyDatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.DerbyDatabaseAdapter;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.MockSessionFactory;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.util.file.FileUtils;
@ -46,6 +48,7 @@ import java.sql.Types;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.DB_TYPE; import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.DB_TYPE;
import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.FRAGMENT_COUNT; import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.FRAGMENT_COUNT;
@ -141,6 +144,15 @@ public class TestGenerateTableFetch {
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID"); runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
runner.run(); runner.run();
// Assert all the sessions were committed
MockSessionFactory runnerSessionFactory = (MockSessionFactory) runner.getProcessSessionFactory();
Set<MockProcessSession> sessions = runnerSessionFactory.getCreatedSessions();
for (MockProcessSession session : sessions) {
session.assertCommitted();
}
// Verify the expected FlowFile
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray()); String query = new String(flowFile.toByteArray());