diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 07e6fe9ed0..6647c4cf12 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -44,6 +44,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.LongHolder; import org.apache.nifi.util.StopWatch; @EventDriven @@ -62,7 +63,7 @@ public class ExecuteSQL extends AbstractProcessor { .description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship") .build(); private final Set relationships; - + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() .name("Database Connection Pooling Service") .description("The Controller Service that is used to obtain connection to database") @@ -91,11 +92,11 @@ public class ExecuteSQL extends AbstractProcessor { private final List propDescriptors; public ExecuteSQL() { - HashSet r = new HashSet<>(); + final Set r = new HashSet<>(); r.add(REL_SUCCESS); relationships = Collections.unmodifiableSet(r); - ArrayList pds = new ArrayList<>(); + final List pds = new ArrayList<>(); pds.add(DBCP_SERVICE); pds.add(SQL_SELECT_QUERY); pds.add(QUERY_TIMEOUT); @@ -113,7 +114,7 @@ public class ExecuteSQL extends AbstractProcessor { } @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile incoming = session.get(); if (incoming == null) { return; @@ -126,44 +127,30 @@ public class ExecuteSQL extends AbstractProcessor { final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); final StopWatch stopWatch = new StopWatch(true); - - try { - final Connection con = dbcpService.getConnection(); - try { - final Statement st = con.createStatement(); - try { - st.setQueryTimeout(queryTimeout); // timeout in seconds - FlowFile outgoing = session.write(incoming, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - try { - logger.info("start executing query {}", new Object[]{selectQuery}); - ResultSet resultSet = st.executeQuery(selectQuery); - Long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, out); - logger.info("Result FlowFile contain {} Avro records", new Object[]{nrOfRows}); - - } catch (SQLException e) { - throw new ProcessException(e); - } - } - }); - - logger.info("Transferred {} to 'success'", new Object[]{outgoing}); - session.getProvenanceReporter().modifyContent(outgoing, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(outgoing, REL_SUCCESS); - } finally { - st.close(); - } - } finally { - // return connection to pool - con.close(); - } - } catch (ProcessException e) { - logger.error("Unable to execute sql select query due to {}", new Object[]{e}); - session.transfer(incoming, REL_FAILURE); - } catch (SQLException e) { - logger.error("Unable to execute sql select query due to {}", new Object[]{e}); + try (final Connection con = dbcpService.getConnection(); + final Statement st = con.createStatement()) { + st.setQueryTimeout(queryTimeout); // timeout in seconds + final LongHolder nrOfRows = new LongHolder(0L); + FlowFile outgoing = session.write(incoming, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { + logger.debug("Executing query {}", new Object[] { selectQuery }); + final ResultSet resultSet = st.executeQuery(selectQuery); + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out)); + } catch (final SQLException e) { + throw new ProcessException(e); + } + } + }); + + logger.info("{} contains {} Avro records", new Object[] { nrOfRows.get() }); + logger.info("Transferred {} to 'success'", new Object[] { outgoing }); + session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(outgoing, REL_SUCCESS); + } catch (final ProcessException | SQLException e) { + logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] { selectQuery, incoming, e }); session.transfer(incoming, REL_FAILURE); } } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index c9ad423b05..8dff244649 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -21,6 +21,7 @@ import java.io.OutputStream; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import static java.sql.Types.*; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -34,104 +35,102 @@ import org.apache.avro.io.DatumWriter; /** * JDBC / SQL common functions. - * */ public class JdbcCommon { - public static long convertToAvroStream(ResultSet rs, OutputStream outStream) throws SQLException, IOException { - - Schema schema = createSchema(rs); - GenericRecord rec = new GenericData.Record(schema); - - DatumWriter datumWriter = new GenericDatumWriter(schema); - DataFileWriter dataFileWriter= new DataFileWriter(datumWriter); - dataFileWriter.create(schema, outStream); - - ResultSetMetaData meta = rs.getMetaData(); - int nrOfColumns = meta.getColumnCount(); - long nrOfRows = 0; - while (rs.next()) { - for (int i = 1; i <= nrOfColumns; i++) { - Object value = rs.getObject(i); - rec.put(i-1, value); - } - dataFileWriter.append(rec); - nrOfRows += 1; + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException { + final Schema schema = createSchema(rs); + final GenericRecord rec = new GenericData.Record(schema); + + final DatumWriter datumWriter = new GenericDatumWriter(schema); + try (final DataFileWriter dataFileWriter = new DataFileWriter(datumWriter)) { + dataFileWriter.create(schema, outStream); + + final ResultSetMetaData meta = rs.getMetaData(); + final int nrOfColumns = meta.getColumnCount(); + long nrOfRows = 0; + while (rs.next()) { + for (int i = 1; i <= nrOfColumns; i++) { + final Object value = rs.getObject(i); + rec.put(i - 1, value); + } + dataFileWriter.append(rec); + nrOfRows += 1; + } + + return nrOfRows; } - - dataFileWriter.close(); - return nrOfRows; } - - public static Schema createSchema(ResultSet rs) throws SQLException { - - ResultSetMetaData meta = rs.getMetaData(); - int nrOfColumns = meta.getColumnCount(); - String tableName = meta.getTableName(1); - - FieldAssembler builder = SchemaBuilder.record(tableName).namespace("any.data").fields(); - - /** - * Some missing Avro types - Decimal, Date types. - * May need some additional work. - */ - for (int i = 1; i <= nrOfColumns; i++) + + public static Schema createSchema(final ResultSet rs) throws SQLException { + final ResultSetMetaData meta = rs.getMetaData(); + final int nrOfColumns = meta.getColumnCount(); + final String tableName = meta.getTableName(1); + + final FieldAssembler builder = SchemaBuilder.record(tableName).namespace("any.data").fields(); + + /** + * Some missing Avro types - Decimal, Date types. May need some + * additional work. + */ + for (int i = 1; i <= nrOfColumns; i++) { switch (meta.getColumnType(i)) { - - case java.sql.Types.CHAR: - case java.sql.Types.LONGNVARCHAR: - case java.sql.Types.LONGVARCHAR: - case java.sql.Types.NCHAR: - case java.sql.Types.NVARCHAR: - case java.sql.Types.VARCHAR: - builder.name(meta.getColumnName(i)).type().stringType().noDefault(); - break; + case CHAR: + case LONGNVARCHAR: + case LONGVARCHAR: + case NCHAR: + case NVARCHAR: + case VARCHAR: + builder.name(meta.getColumnName(i)).type().stringType().noDefault(); + break; - case java.sql.Types.BOOLEAN: - builder.name(meta.getColumnName(i)).type().booleanType().noDefault(); - break; + case BOOLEAN: + builder.name(meta.getColumnName(i)).type().booleanType().noDefault(); + break; - case java.sql.Types.INTEGER: - case java.sql.Types.SMALLINT: - case java.sql.Types.TINYINT: - builder.name(meta.getColumnName(i)).type().intType().noDefault(); - break; + case INTEGER: + case SMALLINT: + case TINYINT: + builder.name(meta.getColumnName(i)).type().intType().noDefault(); + break; - case java.sql.Types.BIGINT: - builder.name(meta.getColumnName(i)).type().longType().noDefault(); - break; + case BIGINT: + builder.name(meta.getColumnName(i)).type().longType().noDefault(); + break; - // java.sql.RowId is interface, is seems to be database implementation specific, let's convert to String - case java.sql.Types.ROWID: - builder.name(meta.getColumnName(i)).type().stringType().noDefault(); - break; + // java.sql.RowId is interface, is seems to be database + // implementation specific, let's convert to String + case ROWID: + builder.name(meta.getColumnName(i)).type().stringType().noDefault(); + break; - case java.sql.Types.FLOAT: - case java.sql.Types.REAL: - builder.name(meta.getColumnName(i)).type().floatType().noDefault(); - break; + case FLOAT: + case REAL: + builder.name(meta.getColumnName(i)).type().floatType().noDefault(); + break; - case java.sql.Types.DOUBLE: - builder.name(meta.getColumnName(i)).type().doubleType().noDefault(); - break; + case DOUBLE: + builder.name(meta.getColumnName(i)).type().doubleType().noDefault(); + break; - // TODO Did not find direct suitable type, need to be clarified!!!! - case java.sql.Types.DECIMAL: - case java.sql.Types.NUMERIC: - builder.name(meta.getColumnName(i)).type().stringType().noDefault(); - break; + // Did not find direct suitable type, need to be clarified!!!! + case DECIMAL: + case NUMERIC: + builder.name(meta.getColumnName(i)).type().stringType().noDefault(); + break; - // TODO Did not find direct suitable type, need to be clarified!!!! - case java.sql.Types.DATE: - case java.sql.Types.TIME: - case java.sql.Types.TIMESTAMP: - builder.name(meta.getColumnName(i)).type().stringType().noDefault(); - break; + // Did not find direct suitable type, need to be clarified!!!! + case DATE: + case TIME: + case TIMESTAMP: + builder.name(meta.getColumnName(i)).type().stringType().noDefault(); + break; - default: - break; + default: + break; } + } + return builder.endRecord(); } - } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index b9243ef3b0..30da519e5a 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -65,7 +65,7 @@ public class TestExecuteSQL { public static void setup() { System.setProperty("derby.stream.error.file", "target/derby.log"); } - + @Test public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException { invokeOnTrigger(null); @@ -77,44 +77,45 @@ public class TestExecuteSQL { invokeOnTrigger(1); // 1 second max time } - public void invokeOnTrigger(Integer queryTimeout) throws InitializationException, ClassNotFoundException, SQLException, IOException { + public void invokeOnTrigger(final Integer queryTimeout) throws InitializationException, ClassNotFoundException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class); - + final DBCPService dbcp = new DBCPServiceSimpleImpl(); final Map dbcpProperties = new HashMap<>(); runner.addControllerService("dbcp", dbcp, dbcpProperties); - + runner.enableControllerService(dbcp); runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp"); - - if (queryTimeout!=null) + + if (queryTimeout != null) { runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs"); - + } + // remove previous test database, if any - File dbLocation = new File(DB_LOCATION); + final File dbLocation = new File(DB_LOCATION); dbLocation.delete(); // load test data to database - Connection con = dbcp.getConnection(); + final Connection con = dbcp.getConnection(); TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000); - System.out.println("test data loaded"); - + LOGGER.info("test data loaded"); + // ResultSet size will be 1x2000x1000 = 2 000 000 rows // because of where PER.ID = ${person.id} final int nrOfRows = 2000000; - String query = "select " + final String query = "select " + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + ", ROW_NUMBER() OVER () as rownr " + " from persons PER, products PRD, relationships REL" + " where PER.ID = ${person.id}"; - + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query); // incoming FlowFile content is not used, but attributes are used - Map attributes = new HashMap(); + final Map attributes = new HashMap(); attributes.put("person.id", "10"); runner.enqueue("Hello".getBytes(), attributes); @@ -122,27 +123,27 @@ public class TestExecuteSQL { runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); // read all Avro records and verify created FlowFile contains 1000000 records - List flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS); - InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray()); - DatumReader datumReader = new GenericDatumReader(); - DataFileStream dataFileReader = new DataFileStream(in, datumReader); + final List flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS); + final InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray()); + final DatumReader datumReader = new GenericDatumReader(); + final DataFileStream dataFileReader = new DataFileStream(in, datumReader); GenericRecord record = null; long recordsFromStream = 0; while (dataFileReader.hasNext()) { // Reuse record object by passing it to next(). This saves us from // allocating and garbage collecting many objects for files with many items. record = dataFileReader.next(record); -// System.out.println(record); recordsFromStream += 1; } - System.out.println("total nr of records from stream: " + recordsFromStream); + + LOGGER.info("total nr of records from stream: " + recordsFromStream); assertEquals(nrOfRows, recordsFromStream); dataFileReader.close(); } - - - + + + /** * Simple implementation only for ExecuteSQL processor testing. * @@ -157,13 +158,13 @@ public class TestExecuteSQL { @Override public Connection getConnection() throws ProcessException { try { - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); return con; - } catch (Exception e) { + } catch (final Exception e) { throw new ProcessException("getConnection failed: " + e); } - } + } } - + } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java index 8c54bc08db..89c2fb3d45 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java @@ -39,17 +39,18 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; /** * Test streaming using large number of result set rows. * 1. Read data from database. * 2. Create Avro schema from ResultSet meta data. - * 3. Read rows from ResultSet and write rows to Avro writer stream + * 3. Read rows from ResultSet and write rows to Avro writer stream * (Avro will create record for each row). - * 4. And finally read records from Avro stream to verify all data is present in Avro stream. - * - * + * 4. And finally read records from Avro stream to verify all data is present in Avro stream. + * + * * Sql query will return all combinations from 3 table. * For example when each table contain 1000 rows, result set will be 1 000 000 000 rows. * @@ -63,7 +64,7 @@ public class TestJdbcHugeStream { System.setProperty("derby.stream.error.file", "target/derby.log"); } - /** + /** * In case of large record set this will fail with * java.lang.OutOfMemoryError: Java heap space * at java.util.Arrays.copyOf(Arrays.java:2271) @@ -71,149 +72,147 @@ public class TestJdbcHugeStream { * at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) * at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) * at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446) - * - */ -// @Test + * + */ + @Test + @Ignore public void readSend2StreamHuge_InMemory() throws ClassNotFoundException, SQLException, IOException { - + // remove previous test database, if any - File dbLocation = new File(DB_LOCATION); + final File dbLocation = new File(DB_LOCATION); dbLocation.delete(); - Connection con = createConnection(); - loadTestData2Database(con, 150, 150, 150); - System.out.println("test data loaded"); - - Statement st = con.createStatement(); - - // Notice! - // Following select is deliberately invalid! - // For testing we need huge amount of rows, so where part is not used. - ResultSet resultSet = st.executeQuery("select " - + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" - + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" - + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" - + ", ROW_NUMBER() OVER () as rownr " - + " from persons PER, products PRD, relationships REL"); + try (final Connection con = createConnection()) { + loadTestData2Database(con, 150, 150, 150); + System.out.println("test data loaded"); - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream); - System.out.println("total nr of rows in resultset: " + nrOfRows); + try (final Statement st = con.createStatement()) { + // Notice! + // Following select is deliberately invalid! + // For testing we need huge amount of rows, so where part is not + // used. + final ResultSet resultSet = st.executeQuery("select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + + ", ROW_NUMBER() OVER () as rownr " + + " from persons PER, products PRD, relationships REL"); - byte[] serializedBytes = outStream.toByteArray(); - assertNotNull(serializedBytes); - System.out.println("Avro serialized result size in bytes: " + serializedBytes.length); + final ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + final long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream); + System.out.println("total nr of rows in resultset: " + nrOfRows); - // Deserialize bytes to records - - InputStream instream = new ByteArrayInputStream(serializedBytes); - - DatumReader datumReader = new GenericDatumReader(); - DataFileStream dataFileReader = new DataFileStream(instream, datumReader); - GenericRecord record = null; - long recordsFromStream = 0; - while (dataFileReader.hasNext()) { - // Reuse record object by passing it to next(). This saves us from - // allocating and garbage collecting many objects for files with many items. - record = dataFileReader.next(record); -// System.out.println(record); - recordsFromStream += 1; + final byte[] serializedBytes = outStream.toByteArray(); + assertNotNull(serializedBytes); + System.out.println("Avro serialized result size in bytes: " + serializedBytes.length); + + // Deserialize bytes to records + + final InputStream instream = new ByteArrayInputStream(serializedBytes); + + final DatumReader datumReader = new GenericDatumReader(); + try (final DataFileStream dataFileReader = new DataFileStream(instream, datumReader)) { + GenericRecord record = null; + long recordsFromStream = 0; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This + // saves us from + // allocating and garbage collecting many objects for + // files with many items. + record = dataFileReader.next(record); + recordsFromStream += 1; + } + System.out.println("total nr of records from stream: " + recordsFromStream); + assertEquals(nrOfRows, recordsFromStream); + } + } } - System.out.println("total nr of records from stream: " + recordsFromStream); - assertEquals(nrOfRows, recordsFromStream); - st.close(); - con.close(); } - + @Test public void readSend2StreamHuge_FileBased() throws ClassNotFoundException, SQLException, IOException { - + // remove previous test database, if any - File dbLocation = new File(DB_LOCATION); + final File dbLocation = new File(DB_LOCATION); dbLocation.delete(); - Connection con = createConnection(); + try (final Connection con = createConnection()) { loadTestData2Database(con, 300, 300, 300); - System.out.println("test data loaded"); - - Statement st = con.createStatement(); - - // Notice! - // Following select is deliberately invalid! - // For testing we need huge amount of rows, so where part is not used. - ResultSet resultSet = st.executeQuery("select " - + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" - + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" - + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" - + ", ROW_NUMBER() OVER () as rownr " - + " from persons PER, products PRD, relationships REL"); - OutputStream outStream = new FileOutputStream("target/data.avro"); - long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream); - System.out.println("total nr of rows in resultset: " + nrOfRows); -/* - byte[] serializedBytes = outStream.toByteArray(); - assertNotNull(serializedBytes); - System.out.println("Avro serialized result size in bytes: " + serializedBytes.length); -*/ - // Deserialize bytes to records - - InputStream instream = new FileInputStream("target/data.avro"); - - DatumReader datumReader = new GenericDatumReader(); - DataFileStream dataFileReader = new DataFileStream(instream, datumReader); - GenericRecord record = null; - long recordsFromStream = 0; - while (dataFileReader.hasNext()) { - // Reuse record object by passing it to next(). This saves us from - // allocating and garbage collecting many objects for files with many items. - record = dataFileReader.next(record); -// System.out.println(record); - recordsFromStream += 1; + try (final Statement st = con.createStatement()) { + // Notice! + // Following select is deliberately invalid! + // For testing we need huge amount of rows, so where part is not + // used. + final ResultSet resultSet = st.executeQuery("select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + + ", ROW_NUMBER() OVER () as rownr " + + " from persons PER, products PRD, relationships REL"); + + final OutputStream outStream = new FileOutputStream("target/data.avro"); + final long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream); + + // Deserialize bytes to records + final InputStream instream = new FileInputStream("target/data.avro"); + + final DatumReader datumReader = new GenericDatumReader(); + try (final DataFileStream dataFileReader = new DataFileStream(instream, datumReader)) { + GenericRecord record = null; + long recordsFromStream = 0; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This + // saves us from + // allocating and garbage collecting many objects for + // files with many items. + record = dataFileReader.next(record); + recordsFromStream += 1; + } + System.out.println("total nr of records from stream: " + recordsFromStream); + assertEquals(nrOfRows, recordsFromStream); + } + } } - System.out.println("total nr of records from stream: " + recordsFromStream); - assertEquals(nrOfRows, recordsFromStream); - st.close(); - con.close(); } - + //================================================ helpers =============================================== - - static String dropPersons = "drop table persons"; - static String dropProducts = "drop table products"; - static String dropRelationships= "drop table relationships"; - static String createPersons = "create table persons (id integer, name varchar(100), code integer)"; - static String createProducts = "create table products (id integer, name varchar(100), code integer)"; - static String createRelationships = "create table relationships(id integer,name varchar(100), code integer)"; + + static String dropPersons = "drop table persons"; + static String dropProducts = "drop table products"; + static String dropRelationships = "drop table relationships"; + static String createPersons = "create table persons (id integer, name varchar(100), code integer)"; + static String createProducts = "create table products (id integer, name varchar(100), code integer)"; + static String createRelationships = "create table relationships (id integer,name varchar(100), code integer)"; static public void loadTestData2Database(Connection con, int nrOfPersons, int nrOfProducts, int nrOfRels) throws ClassNotFoundException, SQLException { - + System.out.println(createRandomName()); System.out.println(createRandomName()); System.out.println(createRandomName()); - - Statement st = con.createStatement(); + + final Statement st = con.createStatement(); // tables may not exist, this is not serious problem. try { st.executeUpdate(dropPersons); - } catch (Exception e) { } - + } catch (final Exception e) { } + try { st.executeUpdate(dropProducts); - } catch (Exception e) { } - + } catch (final Exception e) { } + try { st.executeUpdate(dropRelationships); - } catch (Exception e) { } + } catch (final Exception e) { } st.executeUpdate(createPersons); st.executeUpdate(createProducts); st.executeUpdate(createRelationships); - + for (int i = 0; i < nrOfPersons; i++) loadPersons(st, i); - + for (int i = 0; i < nrOfProducts; i++) loadProducts(st, i); - + for (int i = 0; i < nrOfRels; i++) loadRelationships(st, i); @@ -223,41 +222,37 @@ public class TestJdbcHugeStream { static Random rng = new Random(53495); static private void loadPersons(Statement st, int nr) throws SQLException { - - st.executeUpdate("insert into persons values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" ); + st.executeUpdate("insert into persons values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" ); } static private void loadProducts(Statement st, int nr) throws SQLException { - - st.executeUpdate("insert into products values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" ); + st.executeUpdate("insert into products values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" ); } static private void loadRelationships(Statement st, int nr) throws SQLException { - - st.executeUpdate("insert into relationships values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" ); + st.executeUpdate("insert into relationships values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" ); } static private String createRandomName() { return createRandomString() + " " + createRandomString(); } - + static private String createRandomString() { - - int length = rng.nextInt(19); - String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; - - char[] text = new char[length]; + + final int length = rng.nextInt(19); + final String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + final char[] text = new char[length]; for (int i = 0; i < length; i++) { text[i] = characters.charAt(rng.nextInt(characters.length())); } - return new String(text); + return new String(text); } - + private Connection createConnection() throws ClassNotFoundException, SQLException { - - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); return con; } diff --git a/nifi/pom.xml b/nifi/pom.xml index 039057e8f3..0c71ba8ae5 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -857,6 +857,11 @@ json-path 2.0.0 + + org.apache.derby + derby + 10.11.1.1 +