diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy deleted file mode 100644 index 5b1c72eed8..0000000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.record.sink.db - -import org.apache.nifi.attribute.expression.language.StandardPropertyValue -import org.apache.nifi.components.PropertyValue -import org.apache.nifi.components.state.StateManager -import org.apache.nifi.controller.ConfigurationContext -import org.apache.nifi.controller.ControllerServiceInitializationContext -import org.apache.nifi.dbcp.DBCPConnectionPool -import org.apache.nifi.dbcp.DBCPService -import org.apache.nifi.logging.ComponentLog -import org.apache.nifi.record.sink.RecordSinkService -import org.apache.nifi.reporting.InitializationException -import org.apache.nifi.serialization.RecordSetWriterFactory -import org.apache.nifi.serialization.SimpleRecordSchema -import org.apache.nifi.serialization.WriteResult -import org.apache.nifi.serialization.record.ListRecordSet -import org.apache.nifi.serialization.record.MapRecord -import org.apache.nifi.serialization.record.MockRecordWriter -import org.apache.nifi.serialization.record.RecordField -import org.apache.nifi.serialization.record.RecordFieldType -import org.apache.nifi.serialization.record.RecordSchema -import org.apache.nifi.serialization.record.RecordSet -import org.apache.nifi.state.MockStateManager -import org.apache.nifi.util.MockControllerServiceInitializationContext -import org.apache.nifi.util.MockPropertyValue -import org.apache.nifi.util.file.FileUtils -import org.junit.jupiter.api.AfterAll -import org.junit.jupiter.api.BeforeAll -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.function.Executable - -import java.sql.DriverManager -import java.sql.ResultSet -import java.sql.SQLException -import java.sql.SQLNonTransientConnectionException -import java.sql.Statement - -import static org.apache.nifi.dbcp.utils.DBCPProperties.DATABASE_URL -import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME -import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVER_LOCATION -import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD -import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER -import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD -import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE -import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PRINCIPAL -import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PASSWORD -import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE -import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_CONN_LIFETIME -import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_IDLE -import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS -import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME -import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME -import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE -import static org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME -import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY -import static org.junit.jupiter.api.Assertions.assertEquals -import static org.junit.jupiter.api.Assertions.assertFalse -import static org.junit.jupiter.api.Assertions.assertNotNull -import static org.junit.jupiter.api.Assertions.assertThrows -import static org.junit.jupiter.api.Assertions.assertTrue - -import static org.mockito.Mockito.mock -import static org.mockito.Mockito.when - -class DatabaseRecordSinkTest { - - final static String DB_LOCATION = "target/db" - - DBCPService dbcpService - - @BeforeAll - static void setup() { - System.setProperty("derby.stream.error.file", "target/derby.log") - } - - @AfterAll - static void cleanUpAfterClass() throws Exception { - try { - DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true") - } catch (SQLNonTransientConnectionException ignore) { - // Do nothing, this is what happens at Derby shutdown - } - // remove previous test database, if any - final File dbLocation = new File(DB_LOCATION) - try { - FileUtils.deleteFile(dbLocation, true) - } catch (IOException ignore) { - // Do nothing, may not have existed - } - } - - private ConfigurationContext context - - @Test - void testRecordFormat() throws IOException, InitializationException { - DatabaseRecordSink task = initTask('TESTTABLE') - - // Create the table - Class.forName("org.apache.derby.jdbc.EmbeddedDriver") - def con = DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true") - final Statement stmt = con.createStatement() - try { - stmt.execute("drop table TESTTABLE") - } catch (final SQLException sqle) { - // Ignore, usually due to Derby not having DROP TABLE IF EXISTS - } - try { - stmt.executeUpdate('CREATE TABLE testTable (field1 integer, field2 varchar(20))') - } finally { - stmt.close() - } - - List recordFields = Arrays.asList( - new RecordField("field1", RecordFieldType.INT.getDataType()), - new RecordField("field2", RecordFieldType.STRING.getDataType()) - ) - RecordSchema recordSchema = new SimpleRecordSchema(recordFields) - - Map row1 = new HashMap<>() - row1.put('field1', 15) - row1.put('field2', 'Hello') - - Map row2 = new HashMap<>() - row2.put('field1', 6) - row2.put('field2', 'World!') - - RecordSet recordSet = new ListRecordSet(recordSchema, Arrays.asList( - new MapRecord(recordSchema, row1), - new MapRecord(recordSchema, row2) - )) - - WriteResult writeResult = task.sendData(recordSet, ['a': 'Hello'], true) - assertNotNull(writeResult) - assertEquals(2, writeResult.recordCount) - assertEquals('Hello', writeResult.attributes['a']) - - final Statement st = con.createStatement() - final ResultSet resultSet = st.executeQuery('select * from testTable') - assertTrue(resultSet.next()) - - def f1 = resultSet.getObject(1) - assertNotNull(f1) - assertTrue(f1 instanceof Integer) - assertEquals(15, f1) - def f2 = resultSet.getObject(2) - assertNotNull(f2) - assertTrue(f2 instanceof String) - assertEquals('Hello', f2) - - assertTrue(resultSet.next()) - - f1 = resultSet.getObject(1) - assertNotNull(f1) - assertTrue(f1 instanceof Integer) - assertEquals(6, f1) - f2 = resultSet.getObject(2) - assertNotNull(f2) - assertTrue(f2 instanceof String) - assertEquals('World!', f2) - - assertFalse(resultSet.next()) - } - - @Test - void testMissingTable() throws IOException, InitializationException { - DatabaseRecordSink task = initTask('NO_SUCH_TABLE') - - List recordFields = Arrays.asList( - new RecordField("field1", RecordFieldType.INT.getDataType()), - new RecordField("field2", RecordFieldType.STRING.getDataType()) - ) - RecordSchema recordSchema = new SimpleRecordSchema(recordFields) - - Map row1 = new HashMap<>() - row1.put('field1', 15) - row1.put('field2', 'Hello') - - RecordSet recordSet = new ListRecordSet(recordSchema, Collections.singletonList(new MapRecord(recordSchema, row1))) - assertThrows(IOException.class, { task.sendData(recordSet, new HashMap<>(), true) } as Executable, - 'Should have generated an exception for table not present') - } - - @Test - void testMissingField() throws IOException, InitializationException { - DatabaseRecordSink task = initTask('TESTTABLE') - - // Create the table - Class.forName("org.apache.derby.jdbc.EmbeddedDriver") - def con = DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true") - final Statement stmt = con.createStatement() - try { - stmt.execute("drop table TESTTABLE") - } catch (final SQLException sqle) { - // Ignore, usually due to Derby not having DROP TABLE IF EXISTS - } - try { - stmt.executeUpdate('CREATE TABLE testTable (field1 integer, field2 varchar(20) not null)') - } finally { - stmt.close() - } - - List recordFields = Arrays.asList( - new RecordField("field1", RecordFieldType.INT.getDataType()) - ) - RecordSchema recordSchema = new SimpleRecordSchema(recordFields) - - Map row1 = new HashMap<>() - row1.put('field1', 15) - row1.put('field2', 'Hello') - row1.put('field3', 'fail') - - RecordSet recordSet = new ListRecordSet(recordSchema, Collections.singletonList(new MapRecord(recordSchema, row1))) - assertThrows(IOException.class, { task.sendData(recordSet, new HashMap<>(), true) } as Executable, - 'Should have generated an exception for column not present') - } - - @Test - void testMissingColumn() throws IOException, InitializationException { - DatabaseRecordSink task = initTask('TESTTABLE') - - // Create the table - Class.forName("org.apache.derby.jdbc.EmbeddedDriver") - def con = DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true") - final Statement stmt = con.createStatement() - try { - stmt.execute("drop table TESTTABLE") - } catch (final SQLException sqle) { - // Ignore, usually due to Derby not having DROP TABLE IF EXISTS - } - try { - stmt.executeUpdate('CREATE TABLE testTable (field1 integer, field2 varchar(20))') - } finally { - stmt.close() - } - - List recordFields = Arrays.asList( - new RecordField("field1", RecordFieldType.INT.getDataType()), - new RecordField("field2", RecordFieldType.STRING.getDataType()), - new RecordField("field3", RecordFieldType.STRING.getDataType()) - ) - RecordSchema recordSchema = new SimpleRecordSchema(recordFields) - - Map row1 = new HashMap<>() - row1.put('field1', 15) - - RecordSet recordSet = new ListRecordSet(recordSchema, Collections.singletonList(new MapRecord(recordSchema, row1))) - assertThrows(IOException.class, { task.sendData(recordSet, new HashMap<>(), true) } as Executable, - 'Should have generated an exception for field not present') - } - - DatabaseRecordSink initTask(String tableName) throws InitializationException, IOException { - - final ComponentLog logger = mock(ComponentLog.class) - final DatabaseRecordSink task = new DatabaseRecordSink() - context = mock(ConfigurationContext.class) - final StateManager stateManager = new MockStateManager(task) - - final PropertyValue pValue = mock(StandardPropertyValue.class) - final MockRecordWriter writer = new MockRecordWriter(null, false) // No header, don't quote values - when(context.getProperty(RecordSinkService.RECORD_WRITER_FACTORY)).thenReturn(pValue) - when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer) - when(context.getProperty(DatabaseRecordSink.CATALOG_NAME)).thenReturn(new MockPropertyValue(null)) - when(context.getProperty(DatabaseRecordSink.SCHEMA_NAME)).thenReturn(new MockPropertyValue(null)) - when(context.getProperty(DatabaseRecordSink.TABLE_NAME)).thenReturn(new MockPropertyValue(tableName ?: 'TESTTABLE')) - when(context.getProperty(DatabaseRecordSink.QUOTED_IDENTIFIERS)).thenReturn(new MockPropertyValue('false')) - when(context.getProperty(DatabaseRecordSink.QUOTED_TABLE_IDENTIFIER)).thenReturn(new MockPropertyValue('true')) - when(context.getProperty(DatabaseRecordSink.QUERY_TIMEOUT)).thenReturn(new MockPropertyValue('5 sec')) - when(context.getProperty(DatabaseRecordSink.TRANSLATE_FIELD_NAMES)).thenReturn(new MockPropertyValue('true')) - when(context.getProperty(DatabaseRecordSink.UNMATCHED_FIELD_BEHAVIOR)).thenReturn(new MockPropertyValue(DatabaseRecordSink.FAIL_UNMATCHED_FIELD.value)) - when(context.getProperty(DatabaseRecordSink.UNMATCHED_COLUMN_BEHAVIOR)).thenReturn(new MockPropertyValue(DatabaseRecordSink.FAIL_UNMATCHED_COLUMN.value)) - - // Set up the DBCPService to connect to a temp H2 database - dbcpService = new DBCPConnectionPool() - when(pValue.asControllerService(DBCPService.class)).thenReturn(dbcpService) - when(context.getProperty(DatabaseRecordSink.DBCP_SERVICE)).thenReturn(pValue) - - final ConfigurationContext dbContext = mock(ConfigurationContext.class) - final StateManager dbStateManager = new MockStateManager(dbcpService) - - when(dbContext.getProperty(DATABASE_URL)).thenReturn(new MockPropertyValue("jdbc:derby:${DB_LOCATION}")) - when(dbContext.getProperty(DB_USER)).thenReturn(new MockPropertyValue(null)) - when(dbContext.getProperty(DB_PASSWORD)).thenReturn(new MockPropertyValue(null)) - when(dbContext.getProperty(DB_DRIVERNAME)).thenReturn(new MockPropertyValue('org.apache.derby.jdbc.EmbeddedDriver')) - when(dbContext.getProperty(DB_DRIVER_LOCATION)).thenReturn(new MockPropertyValue('')) - when(dbContext.getProperty(MAX_TOTAL_CONNECTIONS)).thenReturn(new MockPropertyValue('1')) - when(dbContext.getProperty(VALIDATION_QUERY)).thenReturn(new MockPropertyValue('')) - when(dbContext.getProperty(MAX_WAIT_TIME)).thenReturn(new MockPropertyValue('5 sec')) - when(dbContext.getProperty(MIN_IDLE)).thenReturn(new MockPropertyValue('0')) - when(dbContext.getProperty(MAX_IDLE)).thenReturn(new MockPropertyValue('0')) - when(dbContext.getProperty(MAX_CONN_LIFETIME)).thenReturn(new MockPropertyValue('5 sec')) - when(dbContext.getProperty(EVICTION_RUN_PERIOD)).thenReturn(new MockPropertyValue('5 sec')) - when(dbContext.getProperty(MIN_EVICTABLE_IDLE_TIME)).thenReturn(new MockPropertyValue('5 sec')) - when(dbContext.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME)).thenReturn(new MockPropertyValue('5 sec')) - when(dbContext.getProperty(KERBEROS_CREDENTIALS_SERVICE)).thenReturn(new MockPropertyValue(null)) - when(dbContext.getProperty(KERBEROS_USER_SERVICE)).thenReturn(new MockPropertyValue(null)) - when(dbContext.getProperty(KERBEROS_PRINCIPAL)).thenReturn(new MockPropertyValue(null)) - when(dbContext.getProperty(KERBEROS_PASSWORD)).thenReturn(new MockPropertyValue(null)) - - final ControllerServiceInitializationContext dbInitContext = new MockControllerServiceInitializationContext(dbcpService, UUID.randomUUID().toString(), logger, dbStateManager) - dbcpService.initialize(dbInitContext) - dbcpService.onConfigured(dbContext) - - final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(writer, UUID.randomUUID().toString(), logger, stateManager) - task.initialize(initContext) - task.onEnabled(context) - - return task - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.java new file mode 100644 index 0000000000..d8cc5d0873 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.record.sink.db; + +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.dbcp.DBCPConnectionPool; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.dbcp.utils.DBCPProperties; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.record.sink.RecordSinkService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.ListRecordSet; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockControllerServiceInitializationContext; +import org.apache.nifi.util.MockPropertyValue; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE; +import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PASSWORD; +import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PRINCIPAL; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DATABASE_URL; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVER_LOCATION; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER; +import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD; +import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_CONN_LIFETIME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_IDLE; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE; +import static org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DatabaseRecordSinkTest { + + private static final String SERVICE_ID = DBCPConnectionPool.class.getName(); + private static final String DERBY_LOG_PROPERTY = "derby.stream.error.file"; + private static final String DERBY_SHUTDOWN_STATE = "XJ015"; + private static final String DB_LOCATION = "target/db"; + + private TestRunner runner; + private DBCPConnectionPool dbcpService; + private File databaseDirectory; + + @BeforeAll + public static void setDerbyLog() { + final File derbyLog = new File(getSystemTemporaryDirectory(), "derby.log"); + derbyLog.deleteOnExit(); + System.setProperty(DERBY_LOG_PROPERTY, derbyLog.getAbsolutePath()); + } + + @AfterAll + public static void clearDerbyLog() { + System.clearProperty(DERBY_LOG_PROPERTY); + } + + @BeforeEach + public void setService() throws InitializationException { + databaseDirectory = getEmptyDirectory(); + + dbcpService = new DBCPConnectionPool(); + runner = TestRunners.newTestRunner(NoOpProcessor.class); + runner.addControllerService(SERVICE_ID, dbcpService); + + final String url = String.format("jdbc:derby:%s;create=true", databaseDirectory); + runner.setProperty(dbcpService, DBCPProperties.DATABASE_URL, url); + runner.setProperty(dbcpService, DBCPProperties.DB_USER, String.class.getSimpleName()); + runner.setProperty(dbcpService, DBCPProperties.DB_PASSWORD, String.class.getName()); + runner.setProperty(dbcpService, DBCPProperties.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver"); + } + + @AfterEach + public void shutdown() throws IOException { + if (databaseDirectory.exists()) { + final SQLException exception = assertThrows(SQLException.class, () -> DriverManager.getConnection("jdbc:derby:;shutdown=true")); + assertEquals(DERBY_SHUTDOWN_STATE, exception.getSQLState()); + FileUtils.deleteFile(databaseDirectory, true); + } + } + + private ConfigurationContext context; + + @Test + void testRecordFormat() throws IOException, InitializationException, SQLException { + final DatabaseRecordSink task = initTask("TESTTABLE"); + + // Create the table + final Connection con = DriverManager.getConnection(String.format("jdbc:derby:%s;create=true", DB_LOCATION)); + final Statement stmt = con.createStatement(); + try { + stmt.execute("drop table TESTTABLE"); + } catch (final SQLException sqle) { + // Ignore, usually due to Derby not having DROP TABLE IF EXISTS + } + try { + stmt.executeUpdate("CREATE TABLE testTable (field1 integer, field2 varchar(20))"); + } finally { + stmt.close(); + } + + final List recordFields = Arrays.asList( + new RecordField("field1", RecordFieldType.INT.getDataType()), + new RecordField("field2", RecordFieldType.STRING.getDataType()) + ); + final RecordSchema recordSchema = new SimpleRecordSchema(recordFields); + + final Map row1 = new HashMap<>(); + row1.put("field1", 15); + row1.put("field2", "Hello"); + + final Map row2 = new HashMap<>(); + row2.put("field1", 6); + row2.put("field2", "World!"); + + final RecordSet recordSet = new ListRecordSet(recordSchema, Arrays.asList( + new MapRecord(recordSchema, row1), + new MapRecord(recordSchema, row2) + )); + + final WriteResult writeResult = task.sendData(recordSet, Collections.singletonMap("a", "Hello"), true); + assertNotNull(writeResult); + assertEquals(2, writeResult.getRecordCount()); + assertEquals("Hello", writeResult.getAttributes().get("a")); + + final Statement st = con.createStatement(); + final ResultSet resultSet = st.executeQuery("select * from testTable"); + assertTrue(resultSet.next()); + + Object f1 = resultSet.getObject(1); + assertNotNull(f1); + assertTrue(f1 instanceof Integer); + assertEquals(15, f1); + Object f2 = resultSet.getObject(2); + assertNotNull(f2); + assertTrue(f2 instanceof String); + assertEquals("Hello", f2); + + assertTrue(resultSet.next()); + + f1 = resultSet.getObject(1); + assertNotNull(f1); + assertTrue(f1 instanceof Integer); + assertEquals(6, f1); + f2 = resultSet.getObject(2); + assertNotNull(f2); + assertTrue(f2 instanceof String); + assertEquals("World!", f2); + + assertFalse(resultSet.next()); + } + + @Test + void testMissingTable() throws InitializationException { + final DatabaseRecordSink task = initTask("NO_SUCH_TABLE"); + + final List recordFields = Arrays.asList( + new RecordField("field1", RecordFieldType.INT.getDataType()), + new RecordField("field2", RecordFieldType.STRING.getDataType()) + ); + final RecordSchema recordSchema = new SimpleRecordSchema(recordFields); + + final Map row1 = new HashMap<>(); + row1.put("field1", 15); + row1.put("field2", "Hello"); + + final RecordSet recordSet = new ListRecordSet(recordSchema, Collections.singletonList(new MapRecord(recordSchema, row1))); + assertThrows(IOException.class, () -> task.sendData(recordSet, new HashMap<>(), true), + "Should have generated an exception for table not present"); + } + + @Test + void testMissingField() throws InitializationException, SQLException { + final DatabaseRecordSink task = initTask("TESTTABLE"); + + // Create the table + final Connection con = DriverManager.getConnection(String.format("jdbc:derby:%s;create=true", DB_LOCATION)); + final Statement stmt = con.createStatement(); + try { + stmt.execute("drop table TESTTABLE"); + } catch (final SQLException sqle) { + // Ignore, usually due to Derby not having DROP TABLE IF EXISTS + } + try { + stmt.executeUpdate("CREATE TABLE testTable (field1 integer, field2 varchar(20) not null)"); + } finally { + stmt.close(); + } + + final List recordFields = Arrays.asList( + new RecordField("field1", RecordFieldType.INT.getDataType()) + ); + final RecordSchema recordSchema = new SimpleRecordSchema(recordFields); + + final Map row1 = new HashMap<>(); + row1.put("field1", 15); + row1.put("field2", "Hello"); + row1.put("field3", "fail"); + + final RecordSet recordSet = new ListRecordSet(recordSchema, Collections.singletonList(new MapRecord(recordSchema, row1))); + assertThrows(IOException.class, () -> task.sendData(recordSet, new HashMap<>(), true), + "Should have generated an exception for column not present"); + + } + + @Test + void testMissingColumn() throws InitializationException, SQLException { + final DatabaseRecordSink task = initTask("TESTTABLE"); + + // Create the table + final Connection con = DriverManager.getConnection(String.format("jdbc:derby:%s;create=true", DB_LOCATION)); + final Statement stmt = con.createStatement(); + try { + stmt.execute("drop table TESTTABLE"); + } catch (final SQLException sqle) { + // Ignore, usually due to Derby not having DROP TABLE IF EXISTS + } + try { + stmt.executeUpdate("CREATE TABLE testTable (field1 integer, field2 varchar(20))"); + } finally { + stmt.close(); + } + + final List recordFields = Arrays.asList( + new RecordField("field1", RecordFieldType.INT.getDataType()), + new RecordField("field2", RecordFieldType.STRING.getDataType()), + new RecordField("field3", RecordFieldType.STRING.getDataType()) + ); + final RecordSchema recordSchema = new SimpleRecordSchema(recordFields); + + final Map row1 = new HashMap<>(); + row1.put("field1", 15); + + final RecordSet recordSet = new ListRecordSet(recordSchema, Collections.singletonList(new MapRecord(recordSchema, row1))); + assertThrows(IOException.class, () -> task.sendData(recordSet, new HashMap<>(), true), + "Should have generated an exception for field not present"); + } + + private DatabaseRecordSink initTask(String tableName) throws InitializationException { + final ComponentLog logger = mock(ComponentLog.class); + final DatabaseRecordSink task = new DatabaseRecordSink(); + context = mock(ConfigurationContext.class); + final StateManager stateManager = new MockStateManager(task); + + final PropertyValue pValue = mock(StandardPropertyValue.class); + final MockRecordWriter writer = new MockRecordWriter(null, false); // No header, don"t quote values + when(context.getProperty(RecordSinkService.RECORD_WRITER_FACTORY)).thenReturn(pValue); + when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer); + when(context.getProperty(DatabaseRecordSink.CATALOG_NAME)).thenReturn(new MockPropertyValue(null)); + when(context.getProperty(DatabaseRecordSink.SCHEMA_NAME)).thenReturn(new MockPropertyValue(null)); + when(context.getProperty(DatabaseRecordSink.TABLE_NAME)).thenReturn(new MockPropertyValue(tableName != null ? tableName : "TESTTABLE")); + when(context.getProperty(DatabaseRecordSink.QUOTED_IDENTIFIERS)).thenReturn(new MockPropertyValue("false")); + when(context.getProperty(DatabaseRecordSink.QUOTED_TABLE_IDENTIFIER)).thenReturn(new MockPropertyValue("true")); + when(context.getProperty(DatabaseRecordSink.QUERY_TIMEOUT)).thenReturn(new MockPropertyValue("5 sec")); + when(context.getProperty(DatabaseRecordSink.TRANSLATE_FIELD_NAMES)).thenReturn(new MockPropertyValue("true")); + when(context.getProperty(DatabaseRecordSink.UNMATCHED_FIELD_BEHAVIOR)).thenReturn(new MockPropertyValue(DatabaseRecordSink.FAIL_UNMATCHED_FIELD.getValue())); + when(context.getProperty(DatabaseRecordSink.UNMATCHED_COLUMN_BEHAVIOR)).thenReturn(new MockPropertyValue(DatabaseRecordSink.FAIL_UNMATCHED_COLUMN.getValue())); + + // Set up the DBCPService to connect to a temp H2 database + when(pValue.asControllerService(DBCPService.class)).thenReturn(dbcpService); + when(context.getProperty(DatabaseRecordSink.DBCP_SERVICE)).thenReturn(pValue); + + final ConfigurationContext dbContext = mock(ConfigurationContext.class); + final StateManager dbStateManager = new MockStateManager(dbcpService); + + when(dbContext.getProperty(DATABASE_URL)).thenReturn(new MockPropertyValue(String.format("jdbc:derby:%s", DB_LOCATION))); + when(dbContext.getProperty(DB_USER)).thenReturn(new MockPropertyValue(null)); + when(dbContext.getProperty(DB_PASSWORD)).thenReturn(new MockPropertyValue(null)); + when(dbContext.getProperty(DB_DRIVERNAME)).thenReturn(new MockPropertyValue("org.apache.derby.jdbc.EmbeddedDriver")); + when(dbContext.getProperty(DB_DRIVER_LOCATION)).thenReturn(new MockPropertyValue("")); + when(dbContext.getProperty(MAX_TOTAL_CONNECTIONS)).thenReturn(new MockPropertyValue("1")); + when(dbContext.getProperty(VALIDATION_QUERY)).thenReturn(new MockPropertyValue("")); + when(dbContext.getProperty(MAX_WAIT_TIME)).thenReturn(new MockPropertyValue("5 sec")); + when(dbContext.getProperty(MIN_IDLE)).thenReturn(new MockPropertyValue("0")); + when(dbContext.getProperty(MAX_IDLE)).thenReturn(new MockPropertyValue("0")); + when(dbContext.getProperty(MAX_CONN_LIFETIME)).thenReturn(new MockPropertyValue("5 sec")); + when(dbContext.getProperty(EVICTION_RUN_PERIOD)).thenReturn(new MockPropertyValue("5 sec")); + when(dbContext.getProperty(MIN_EVICTABLE_IDLE_TIME)).thenReturn(new MockPropertyValue("5 sec")); + when(dbContext.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME)).thenReturn(new MockPropertyValue("5 sec")); + when(dbContext.getProperty(KERBEROS_CREDENTIALS_SERVICE)).thenReturn(new MockPropertyValue(null)); + when(dbContext.getProperty(KERBEROS_USER_SERVICE)).thenReturn(new MockPropertyValue(null)); + when(dbContext.getProperty(KERBEROS_PRINCIPAL)).thenReturn(new MockPropertyValue(null)); + when(dbContext.getProperty(KERBEROS_PASSWORD)).thenReturn(new MockPropertyValue(null)); + + final ControllerServiceInitializationContext dbInitContext = new MockControllerServiceInitializationContext(dbcpService, UUID.randomUUID().toString(), logger, dbStateManager); + dbcpService.initialize(dbInitContext); + dbcpService.onConfigured(dbContext); + + final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(writer, UUID.randomUUID().toString(), logger, stateManager); + task.initialize(initContext); + task.onEnabled(context); + + return task; + } + + private File getEmptyDirectory() { + final String randomDirectory = String.format("%s-%s", getClass().getSimpleName(), UUID.randomUUID()); + return Paths.get(getSystemTemporaryDirectory(), randomDirectory).toFile(); + } + + private static String getSystemTemporaryDirectory() { + return System.getProperty("java.io.tmpdir"); + } +}