NIFI-8187: Fixed PutSQL duplicating provenance events

Also fixed provenance event mocking in test framework.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4796
This commit is contained in:
Peter Turcsanyi 2021-02-01 17:03:06 +01:00 committed by Matthew Burgess
parent 434c995185
commit 77eb8af275
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
4 changed files with 100 additions and 131 deletions

View File

@ -274,6 +274,7 @@ public class MockProcessSession implements ProcessSession {
}
sharedState.addProvenanceEvents(provenanceReporter.getEvents());
provenanceReporter.clear();
counterMap.clear();
}

View File

@ -18,9 +18,7 @@ package org.apache.nifi.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -37,7 +35,8 @@ public class SharedSessionState {
private final Processor processor;
private final AtomicLong flowFileIdGenerator;
private final ConcurrentMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>();
private final Set<ProvenanceEventRecord> events = new LinkedHashSet<>();
// list of provenance events as they were in the provenance repository (events emitted with force=true or committed with the session)
private final List<ProvenanceEventRecord> events = new ArrayList<>();
public SharedSessionState(final Processor processor, final AtomicLong flowFileIdGenerator) {
flowFileQueue = new MockFlowFileQueue();

View File

@ -386,6 +386,8 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
final PutGroup.PutFlowFiles<FunctionContext, Connection, StatementFlowFileEnclosure> putFlowFiles = (context, session, fc, conn, enclosure, result) -> {
final List<FlowFile> sentFlowFiles = new ArrayList<>();
if (fc.isSupportBatching()) {
// We have PreparedStatement that have batches added to them.
@ -393,6 +395,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
exceptionHandler.execute(fc, enclosure, input -> {
try (final PreparedStatement stmt = enclosure.getCachedStatement(conn)) {
stmt.executeBatch();
sentFlowFiles.addAll(enclosure.getFlowFiles());
result.routeTo(enclosure.getFlowFiles(), REL_SUCCESS);
}
}, onBatchUpdateError(context, session, result));
@ -423,6 +426,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
sentFlowFile = session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey);
}
sentFlowFiles.add(sentFlowFile);
result.routeTo(sentFlowFile, REL_SUCCESS);
}
@ -430,7 +434,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
}
}
if (result.contains(REL_SUCCESS)) {
if (!sentFlowFiles.isEmpty()) {
// Determine the database URL
String url = "jdbc://unknown-host";
try {
@ -440,7 +444,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
// Emit a Provenance SEND event
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos);
for (final FlowFile flowFile : result.getRoutedFlowFiles().get(REL_SUCCESS)) {
for (final FlowFile flowFile : sentFlowFiles) {
session.getProvenanceReporter().send(flowFile, url, transmissionMillis, true);
}
}

View File

@ -45,6 +45,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.xml.bind.DatatypeConverter;
@ -55,6 +56,8 @@ import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -94,10 +97,7 @@ public class TestPutSQL {
@Test
public void testDirectStatements() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final TestRunner runner = initTestRunner();
recreateTable("PERSONS", createPersons);
@ -134,10 +134,7 @@ public class TestPutSQL {
@Test
public void testCommitOnCleanup() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.AUTO_COMMIT, "false");
recreateTable("PERSONS", createPersons);
@ -161,11 +158,8 @@ public class TestPutSQL {
@Test
public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
recreateTable("PERSONS_AI",createPersonsAutoId);
runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes());
@ -187,10 +181,56 @@ public class TestPutSQL {
}
}
@Test
public void testProvenanceEventsWithBatchMode() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "10");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
testProvenanceEvents(runner);
}
@Test
public void testProvenanceEventsWithFragmentedTransactions() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "10");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "true");
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
testProvenanceEvents(runner);
}
@Test
public void testProvenanceEventsWithObtainGeneratedKeys() throws InitializationException, ProcessException, SQLException {
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "10");
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");
testProvenanceEvents(runner);
}
private void testProvenanceEvents(final TestRunner runner) throws InitializationException, ProcessException, SQLException {
recreateTable("PERSONS", createPersons);
runner.enqueue("DELETE FROM PERSONS WHERE ID = 1");
runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)");
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 2);
List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(2, provenanceEvents.size());
for (ProvenanceEventRecord event: provenanceEvents) {
assertEquals(ProvenanceEventType.SEND, event.getEventType());
}
}
@Test
public void testFailInMiddleWithBadStatementAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
testFailInMiddleWithBadStatement(runner);
runner.run();
@ -200,7 +240,7 @@ public class TestPutSQL {
@Test
public void testFailInMiddleWithBadStatementAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
testFailInMiddleWithBadStatement(runner);
runner.run();
@ -210,10 +250,7 @@ public class TestPutSQL {
}
private void testFailInMiddleWithBadStatement(final TestRunner runner) throws InitializationException {
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes());
runner.enqueue("INSERT INTO PERSONS_AI".getBytes()); // intentionally wrong syntax
runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 3)".getBytes());
@ -224,11 +261,8 @@ public class TestPutSQL {
@Test
public void testFailInMiddleWithBadStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes());
runner.enqueue("INSERT INTO PERSONS_AI".getBytes()); // intentionally wrong syntax
@ -247,7 +281,7 @@ public class TestPutSQL {
@Test
public void testFailInMiddleWithBadParameterTypeAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
testFailInMiddleWithBadParameterType(runner);
runner.run();
@ -258,7 +292,7 @@ public class TestPutSQL {
@Test
public void testFailInMiddleWithBadParameterTypeAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
testFailInMiddleWithBadParameterType(runner);
runner.run();
@ -267,10 +301,7 @@ public class TestPutSQL {
}
private void testFailInMiddleWithBadParameterType(final TestRunner runner) throws InitializationException, ProcessException, SQLException, IOException {
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final Map<String, String> goodAttributes = new HashMap<>();
goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
@ -289,11 +320,8 @@ public class TestPutSQL {
@Test
public void testFailInMiddleWithBadParameterTypeRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
final Map<String, String> goodAttributes = new HashMap<>();
@ -322,7 +350,7 @@ public class TestPutSQL {
@Test
public void testFailInMiddleWithBadParameterValueAndSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
testFailInMiddleWithBadParameterValue(runner);
runner.run();
@ -340,7 +368,7 @@ public class TestPutSQL {
@Test
public void testFailInMiddleWithBadParameterValueAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
testFailInMiddleWithBadParameterValue(runner);
runner.run();
@ -362,10 +390,7 @@ public class TestPutSQL {
}
private void testFailInMiddleWithBadParameterValue(final TestRunner runner) throws InitializationException, ProcessException, SQLException, IOException {
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
recreateTable("PERSONS_AI",createPersonsAutoId);
final Map<String, String> goodAttributes = new HashMap<>();
goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
@ -384,11 +409,8 @@ public class TestPutSQL {
@Test
public void testFailInMiddleWithBadParameterValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
recreateTable("PERSONS_AI",createPersonsAutoId);
@ -427,16 +449,13 @@ public class TestPutSQL {
@Test
public void testUsingSqlDataTypesWithNegativeValues() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE PERSONS2 (id integer primary key, name varchar(100), code bigint)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", "-5");
attributes.put("sql.args.1.value", "84");
@ -460,17 +479,13 @@ public class TestPutSQL {
// Not specifying a format for the date fields here to continue to test backwards compatibility
@Test
public void testUsingTimestampValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST1 (id integer primary key, ts1 timestamp, ts2 timestamp)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String arg2TS = "2001-01-01 00:01:01.001";
final String art3TS = "2002-02-02 12:02:02.002";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@ -501,17 +516,13 @@ public class TestPutSQL {
@Test
public void testUsingTimestampValuesWithFormatAttribute() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST2 (id integer primary key, ts1 timestamp, ts2 timestamp)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String dateStr1 = "2002-02-02T12:02:02";
final String dateStrTimestamp1 = "2002-02-02 12:02:02";
final long dateInt1 = Timestamp.valueOf(dateStrTimestamp1).getTime();
@ -549,17 +560,13 @@ public class TestPutSQL {
@Test
public void testUsingDateTimeValuesWithFormatAttribute() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST3 (id integer primary key, ts1 TIME, ts2 DATE)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String dateStr = "2002-03-04";
final String timeStr = "02:03:04";
final String timeFormatString = "HH:mm:ss";
@ -645,17 +652,13 @@ public class TestPutSQL {
@Test
public void testBitType() throws SQLException, InitializationException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE BITTESTS (id integer primary key, bt1 BOOLEAN)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final byte[] insertStatement = "INSERT INTO BITTESTS (ID, bt1) VALUES (?, ?)".getBytes();
Map<String, String> attributes = new HashMap<>();
@ -762,17 +765,13 @@ public class TestPutSQL {
@Test
public void testUsingTimeValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE TIMETESTS (id integer primary key, ts1 time, ts2 time)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String arg2TS = "00:01:02";
final String art3TS = "02:03:04";
final String timeFormatString = "HH:mm:ss";
@ -807,17 +806,13 @@ public class TestPutSQL {
@Test
public void testUsingDateValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE DATETESTS (id integer primary key, ts1 date, ts2 date)");
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String arg2TS = "2001-01-01";
final String art3TS = "2002-02-02";
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
@ -848,7 +843,7 @@ public class TestPutSQL {
@Test
public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException, IOException, ParseException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
final TestRunner runner = initTestRunner();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE BINARYTESTS (id integer primary key, bn1 CHAR(8) FOR BIT DATA, bn2 VARCHAR(100) FOR BIT DATA, " +
@ -856,10 +851,6 @@ public class TestPutSQL {
}
}
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final byte[] insertStatement = "INSERT INTO BINARYTESTS (ID, bn1, bn2, bn3) VALUES (?, ?, ?, ?)".getBytes();
final String arg2BIN = fixedSizeByteArrayAsASCIIString(8);
@ -978,10 +969,7 @@ public class TestPutSQL {
@Test
public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final TestRunner runner = initTestRunner();
recreateTable("PERSONS", createPersons);
@ -1039,10 +1027,7 @@ public class TestPutSQL {
@Test
public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final TestRunner runner = initTestRunner();
recreateTable("PERSONS", createPersons);
@ -1077,10 +1062,7 @@ public class TestPutSQL {
@Test
public void testMultipleStatementsWithinFlowFileRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final TestRunner runner = initTestRunner();
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
recreateTable("PERSONS", createPersons);
@ -1119,10 +1101,7 @@ public class TestPutSQL {
@Test
public void testWithNullParameter() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final TestRunner runner = initTestRunner();
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
@ -1151,10 +1130,7 @@ public class TestPutSQL {
@Test
public void testInvalidStatement() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final TestRunner runner = initTestRunner();
recreateTable("PERSONS", createPersons);
@ -1189,10 +1165,7 @@ public class TestPutSQL {
@Test
public void testInvalidStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final TestRunner runner = initTestRunner();
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
recreateTable("PERSONS", createPersons);
@ -1299,10 +1272,7 @@ public class TestPutSQL {
@Test
public void testMultipleFlowFilesSuccessfulInTransaction() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "1");
recreateTable("PERSONS", createPersons);
@ -1357,10 +1327,7 @@ public class TestPutSQL {
@Test
public void testMultipleFlowFilesSuccessfulInTransactionRollBackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.BATCH_SIZE, "1");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
@ -1390,12 +1357,9 @@ public class TestPutSQL {
@Test
public void testTransactionTimeout() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final Map<String, String> attributes = new HashMap<>();
attributes.put("fragment.identifier", "1");
attributes.put("fragment.count", "2");
@ -1427,12 +1391,9 @@ public class TestPutSQL {
@Test
public void testTransactionTimeoutRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
final Map<String, String> attributes = new HashMap<>();
attributes.put("fragment.identifier", "1");
@ -1469,12 +1430,9 @@ public class TestPutSQL {
@Test
public void testNullFragmentCountRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
final Map<String, String> attribute1 = new HashMap<>();
attribute1.put("fragment.identifier", "1");
@ -1502,10 +1460,7 @@ public class TestPutSQL {
@Test
public void testStatementsFromProperty() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final TestRunner runner = initTestRunner();
runner.setProperty(PutSQL.SQL_STATEMENT, "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (${row.id}, 'Mark', 84)");
recreateTable("PERSONS", createPersons);
@ -1703,4 +1658,14 @@ public class TestPutSQL {
return DatatypeConverter.printBase64Binary(bBinary);
}
private TestRunner initTestRunner() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
return runner;
}
}