From 14bcad212a4e2c7242e892c0389c8da885e87e9c Mon Sep 17 00:00:00 2001 From: Toivo Adams Date: Tue, 26 May 2015 14:36:59 +0300 Subject: [PATCH] NIFI-626 - First working version. Signed-off-by: Toivo Adams Signed-off-by: Mark Payne --- .../nifi/processors/standard/ExecuteSQL.java | 77 ++++++++-------- .../processors/standard/util/JdbcCommon.java | 13 +-- .../org.apache.nifi.processor.Processor | 1 + .../processors/standard/TestExecuteSQL.java | 91 ++++++++++++------- 4 files changed, 101 insertions(+), 81 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 9003b4a8a8..37fd0f77c0 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.standard; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.sql.Connection; import java.sql.ResultSet; @@ -30,24 +29,20 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.util.StopWatch; @@ -92,6 +87,7 @@ public class ExecuteSQL extends AbstractProcessor { relationships = Collections.unmodifiableSet(r); ArrayList pds = new ArrayList<>(); + pds.add(DBCP_SERVICE); pds.add(SQL_SELECT_QUERY); propDescriptors = Collections.unmodifiableList(pds); } @@ -108,47 +104,54 @@ public class ExecuteSQL extends AbstractProcessor { @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { + FlowFile incoming = session.get(); + if (incoming == null) { return; } final ProcessorLog logger = getLogger(); final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); - final String selectQuery = context.getProperty(SQL_SELECT_QUERY).getValue(); + final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(incoming).getValue(); + final StopWatch stopWatch = new StopWatch(true); try { final Connection con = dbcpService.getConnection(); - final Statement st = con.createStatement(); - - final StopWatch stopWatch = new StopWatch(true); - - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - try { - ResultSet resultSet = st.executeQuery(selectQuery); - long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, out); - - - } catch (SQLException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + try { + final Statement st = con.createStatement(); + try { + FlowFile outgoing = session.write(incoming, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { + logger.info("start executing query {}", new Object[]{selectQuery}); + ResultSet resultSet = st.executeQuery(selectQuery); + Long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, out); + logger.info("Result FlowFile contain {} Avro records", new Object[]{nrOfRows}); + + } catch (SQLException e) { + throw new ProcessException(e); + } + } + }); + + logger.info("Transferred {} to 'success'", new Object[]{outgoing}); + session.getProvenanceReporter().modifyContent(outgoing, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(outgoing, REL_SUCCESS); + } finally { + st.close(); } - }); + } finally { + // return connection to pool + con.close(); + } - logger.info("Transferred {} to 'success'", new Object[]{flowFile}); - session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, REL_SUCCESS); - - } catch (FlowFileAccessException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + } catch (ProcessException e) { + logger.error("Unable to execute sql select query due to {}", new Object[]{e}); + session.transfer(incoming, REL_FAILURE); } catch (SQLException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + logger.error("Unable to execute sql select query due to {}", new Object[]{e}); + session.transfer(incoming, REL_FAILURE); } } } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index a361bd6dfe..c9ad423b05 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.standard.util; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.sql.ResultSet; @@ -30,9 +29,7 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.EncoderFactory; /** @@ -46,9 +43,6 @@ public class JdbcCommon { Schema schema = createSchema(rs); GenericRecord rec = new GenericData.Record(schema); -// ByteArrayOutputStream out = new ByteArrayOutputStream(); -// BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); - DatumWriter datumWriter = new GenericDatumWriter(schema); DataFileWriter dataFileWriter= new DataFileWriter(datumWriter); dataFileWriter.create(schema, outStream); @@ -67,10 +61,6 @@ public class JdbcCommon { dataFileWriter.close(); return nrOfRows; -// encoder.flush(); -// out.close(); -// byte[] serializedBytes = out.toByteArray(); -// return serializedBytes; } public static Schema createSchema(ResultSet rs) throws SQLException { @@ -82,7 +72,8 @@ public class JdbcCommon { FieldAssembler builder = SchemaBuilder.record(tableName).namespace("any.data").fields(); /** - * Type conversion is not precise and is incomplete, needs to be fixed!!!!!! + * Some missing Avro types - Decimal, Date types. + * May need some additional work. */ for (int i = 1; i <= nrOfColumns; i++) switch (meta.getColumnType(i)) { diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 17339bc8a2..e62b57ff1f 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -66,3 +66,4 @@ org.apache.nifi.processors.standard.SplitXml org.apache.nifi.processors.standard.TransformXml org.apache.nifi.processors.standard.UnpackContent org.apache.nifi.processors.standard.ValidateXml +org.apache.nifi.processors.standard.ExecuteSQL diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 15bc06cab9..1b6de83bdc 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -16,22 +16,32 @@ */ package org.apache.nifi.processors.standard; +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; import java.sql.Connection; import java.sql.DriverManager; -import java.util.Collection; +import java.sql.SQLException; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.standard.util.TestJdbcHugeStream; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.fusesource.hawtbuf.ByteArrayInputStream; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -58,61 +68,76 @@ public class TestExecuteSQL { } @Test - public void test1() throws InitializationException { + public void test1() throws InitializationException, ClassNotFoundException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class); final DBCPService dbcp = new DBCPServiceSimpleImpl(); final Map dbcpProperties = new HashMap<>(); - dbcpProperties.put("Database Host", "NA"); // Embedded Derby don't use host - dbcpProperties.put("Database Port", "1"); // Embedded Derby don't use port, but must have value anyway - dbcpProperties.put("Database Name", DB_LOCATION); - dbcpProperties.put("Database User", "tester"); - dbcpProperties.put("Password", "testerp"); runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp"); + // remove previous test database, if any + File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + Connection con = dbcp.getConnection(); + TestJdbcHugeStream.loadTestData2Database(con, 100, 100, 100); + System.out.println("test data loaded"); + + // ResultSet size will be 1x100x100 = 10000 rows + // because of where PER.ID = ${person.id} + final int nrOfRows = 10000; String query = "select " + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + ", ROW_NUMBER() OVER () as rownr " - + " from persons PER, products PRD, relationships REL"; + + " from persons PER, products PRD, relationships REL" + + " where PER.ID = ${person.id}"; runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query); - runner.enableControllerService(dbcp); - - runner.enqueue("Hello".getBytes()); + + // incoming FlowFile content is not used, but attributes are used + Map attributes = new HashMap(); + attributes.put("person.id", "10"); + runner.enqueue("Hello".getBytes(), attributes); runner.run(); runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); - runner.clearTransferState(); + + // read all Avro records and verify created FlowFile contains 1000000 records + List flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS); + InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray()); + DatumReader datumReader = new GenericDatumReader(); + DataFileStream dataFileReader = new DataFileStream(in, datumReader); + GenericRecord record = null; + long recordsFromStream = 0; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with many items. + record = dataFileReader.next(record); +// System.out.println(record); + recordsFromStream += 1; + } + System.out.println("total nr of records from stream: " + recordsFromStream); + assertEquals(nrOfRows, recordsFromStream); + dataFileReader.close(); } /** * Simple implementation only for ExecuteSQL processor testing. * */ - class DBCPServiceSimpleImpl implements DBCPService { + class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { @Override - public void initialize(ControllerServiceInitializationContext context) throws InitializationException { } - - @Override - public Collection validate(ValidationContext context) { return null; } - - @Override - public PropertyDescriptor getPropertyDescriptor(String name) { return null; } - - @Override - public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { } - - @Override - public List getPropertyDescriptors() { return null; } - - @Override - public String getIdentifier() { return null; } + public String getIdentifier() { + return "dbcp"; + } @Override public Connection getConnection() throws ProcessException {