diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 8cce56d2e5..aa8bfbe323 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -1,272 +1,285 @@ - - - 4.0.0 - - org.apache.nifi - nifi-standard-bundle - 0.2.0-incubating-SNAPSHOT - - nifi-standard-processors - jar - - - org.apache.nifi - nifi-api - provided - - - org.apache.nifi - nifi-processor-utils - - - org.apache.nifi - nifi-utils - - - org.apache.nifi - nifi-ssl-context-service-api - - - org.apache.nifi - nifi-flowfile-packager - - - org.apache.nifi - nifi-distributed-cache-client-service-api - - - org.apache.nifi - nifi-http-context-map-api - - - commons-io - commons-io - - - com.sun.jersey - jersey-client - - - com.sun.jersey - jersey-server - - - commons-net - commons-net - - - org.apache.commons - commons-compress - - - org.apache.commons - commons-lang3 - - - org.bouncycastle - bcprov-jdk16 - - - org.bouncycastle - bcpg-jdk16 - - - commons-codec - commons-codec - - - org.apache.nifi - nifi-security-utils - - - com.jcraft - jsch - - - com.jcraft - jzlib - - - org.eclipse.jetty - jetty-server - - - org.eclipse.jetty - jetty-servlet - - - org.apache.httpcomponents - httpclient - - - javax.mail - mail - - - com.github.jponge - lzma-java - - - org.tukaani - xz - - - net.sf.saxon - Saxon-HE - - - org.apache.nifi - nifi-mock - test - - - org.apache.nifi - nifi-socket-utils - - - org.apache.nifi - nifi-load-distribution-service-api - - - org.apache.nifi - nifi-distributed-cache-client-service - test - - - joda-time - joda-time - - - javax.jms - javax.jms-api - - - org.apache.activemq - activemq-client - - - com.jayway.jsonpath - json-path - - - org.apache.nifi - nifi-ssl-context-service - test - - - org.apache.tika - tika-core - 1.7 - - - com.fasterxml.jackson.core - jackson-databind - 2.4.5 - - - - - - org.apache.rat - apache-rat-plugin - - - src/test/resources/localhost.cer - src/test/resources/hello.txt - src/test/resources/CharacterSetConversionSamples/Converted.txt - src/test/resources/CharacterSetConversionSamples/Original.txt - src/test/resources/CompressedData/SampleFile.txt - src/test/resources/CompressedData/SampleFileConcat.txt - src/test/resources/ExecuteCommand/1000bytes.txt - src/test/resources/ExecuteCommand/test.txt - src/test/resources/ScanAttribute/dictionary-with-empty-new-lines - src/test/resources/ScanAttribute/dictionary-with-extra-info - src/test/resources/ScanAttribute/dictionary1 - src/test/resources/TestEncryptContent/text.txt - src/test/resources/TestEncryptContent/text.txt.asc - src/test/resources/TestIdentifyMimeType/1.txt - src/test/resources/TestJson/json-sample.json - src/test/resources/TestJson/control-characters.json - src/test/resources/TestMergeContent/demarcate - src/test/resources/TestMergeContent/foot - src/test/resources/TestMergeContent/head - src/test/resources/TestModifyBytes/noFooter.txt - src/test/resources/TestModifyBytes/noFooter_noHeader.txt - src/test/resources/TestModifyBytes/noHeader.txt - src/test/resources/TestModifyBytes/testFile.txt - src/test/resources/TestReplaceTextLineByLine/$1$1.txt - src/test/resources/TestReplaceTextLineByLine/BRue_cRue_RiRey.txt - src/test/resources/TestReplaceTextLineByLine/Blu$2e_clu$2e.txt - src/test/resources/TestReplaceTextLineByLine/D$d_h$d.txt - src/test/resources/TestReplaceTextLineByLine/Good.txt - src/test/resources/TestReplaceTextLineByLine/Spider.txt - src/test/resources/TestReplaceTextLineByLine/[DODO].txt - src/test/resources/TestReplaceTextLineByLine/cu[$1]_Po[$1].txt - src/test/resources/TestReplaceTextLineByLine/cu_Po.txt - src/test/resources/TestReplaceTextLineByLine/food.txt - src/test/resources/TestReplaceTextLineByLine/testFile.txt - src/test/resources/TestReplaceTextWithMapping/color-fruit-backreference-mapping.txt - src/test/resources/TestReplaceTextWithMapping/color-fruit-blank-mapping.txt - src/test/resources/TestReplaceTextWithMapping/color-fruit-escaped-dollar-mapping.txt - src/test/resources/TestReplaceTextWithMapping/color-fruit-excessive-backreference-mapping-simple.txt - src/test/resources/TestReplaceTextWithMapping/color-fruit-excessive-backreference-mapping.txt - src/test/resources/TestReplaceTextWithMapping/color-fruit-invalid-backreference-mapping.txt - src/test/resources/TestReplaceTextWithMapping/color-fruit-mapping.txt - src/test/resources/TestReplaceTextWithMapping/color-fruit-no-match-mapping.txt - src/test/resources/TestReplaceTextWithMapping/color-fruit-space-mapping.txt - src/test/resources/TestReplaceTextWithMapping/colors-without-dashes.txt - src/test/resources/TestReplaceTextWithMapping/colors.txt - src/test/resources/TestScanContent/helloWorld - src/test/resources/TestScanContent/wellthengood-bye - src/test/resources/TestSplitText/1.txt - src/test/resources/TestSplitText/2.txt - src/test/resources/TestSplitText/3.txt - src/test/resources/TestSplitText/4.txt - src/test/resources/TestSplitText/5.txt - src/test/resources/TestSplitText/6.txt - src/test/resources/TestSplitText/original.txt - src/test/resources/TestTransformXml/math.html - src/test/resources/TestTransformXml/tokens.csv - src/test/resources/TestTransformXml/tokens.xml - src/test/resources/TestUnpackContent/folder/cal.txt - src/test/resources/TestUnpackContent/folder/date.txt - src/test/resources/TestUnpackContent/data.flowfilev2 - src/test/resources/TestUnpackContent/data.flowfilev3 - src/test/resources/TestXml/xml-bundle-1 - src/test/resources/CompressedData/SampleFile.txt.bz2 - src/test/resources/CompressedData/SampleFile.txt.gz - src/test/resources/CompressedData/SampleFile1.txt.bz2 - src/test/resources/CompressedData/SampleFile1.txt.gz - src/test/resources/CompressedData/SampleFileConcat.txt.bz2 - src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar - src/test/resources/ExecuteCommand/TestSuccess.jar - src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar - src/test/resources/TestIdentifyMimeType/1.jar - src/test/resources/TestIdentifyMimeType/1.tar - src/test/resources/TestIdentifyMimeType/1.tar.gz - src/test/resources/TestIdentifyMimeType/1.txt.bz2 - src/test/resources/TestIdentifyMimeType/1.txt.gz - src/test/resources/TestIdentifyMimeType/1.zip - src/test/resources/TestIdentifyMimeType/flowfilev1.tar - src/test/resources/TestUnpackContent/data.tar - src/test/resources/TestUnpackContent/data.zip - - - - - + + + 4.0.0 + + org.apache.nifi + nifi-standard-bundle + 0.2.0-incubating-SNAPSHOT + + nifi-standard-processors + jar + + + org.apache.nifi + nifi-api + provided + + + org.apache.nifi + nifi-processor-utils + + + org.apache.nifi + nifi-utils + + + org.apache.nifi + nifi-ssl-context-service-api + + + org.apache.nifi + nifi-flowfile-packager + + + org.apache.nifi + nifi-distributed-cache-client-service-api + + + org.apache.nifi + nifi-http-context-map-api + + + commons-io + commons-io + + + com.sun.jersey + jersey-client + + + com.sun.jersey + jersey-server + + + commons-net + commons-net + + + org.apache.commons + commons-compress + + + org.apache.commons + commons-lang3 + + + org.bouncycastle + bcprov-jdk16 + + + org.bouncycastle + bcpg-jdk16 + + + commons-codec + commons-codec + + + org.apache.nifi + nifi-security-utils + + + com.jcraft + jsch + + + com.jcraft + jzlib + + + org.eclipse.jetty + jetty-server + + + org.eclipse.jetty + jetty-servlet + + + org.apache.httpcomponents + httpclient + + + javax.mail + mail + + + com.github.jponge + lzma-java + + + org.tukaani + xz + + + net.sf.saxon + Saxon-HE + + + org.apache.nifi + nifi-mock + test + + + org.apache.nifi + nifi-socket-utils + + + org.apache.nifi + nifi-load-distribution-service-api + + + org.apache.nifi + nifi-distributed-cache-client-service + test + + + joda-time + joda-time + + + javax.jms + javax.jms-api + + + org.apache.activemq + activemq-client + + + com.jayway.jsonpath + json-path + + + org.apache.nifi + nifi-ssl-context-service + test + + + org.apache.tika + tika-core + 1.7 + + + com.fasterxml.jackson.core + jackson-databind + 2.4.5 + + + org.apache.avro + avro + + + org.apache.nifi + nifi-dbcp-service-api + + + + org.apache.derby + derby + 10.11.1.1 + test + + + + + + + org.apache.rat + apache-rat-plugin + + + src/test/resources/localhost.cer + src/test/resources/hello.txt + src/test/resources/CharacterSetConversionSamples/Converted.txt + src/test/resources/CharacterSetConversionSamples/Original.txt + src/test/resources/CompressedData/SampleFile.txt + src/test/resources/CompressedData/SampleFileConcat.txt + src/test/resources/ExecuteCommand/1000bytes.txt + src/test/resources/ExecuteCommand/test.txt + src/test/resources/ScanAttribute/dictionary-with-empty-new-lines + src/test/resources/ScanAttribute/dictionary-with-extra-info + src/test/resources/ScanAttribute/dictionary1 + src/test/resources/TestEncryptContent/text.txt + src/test/resources/TestEncryptContent/text.txt.asc + src/test/resources/TestIdentifyMimeType/1.txt + src/test/resources/TestJson/json-sample.json + src/test/resources/TestJson/control-characters.json + src/test/resources/TestMergeContent/demarcate + src/test/resources/TestMergeContent/foot + src/test/resources/TestMergeContent/head + src/test/resources/TestModifyBytes/noFooter.txt + src/test/resources/TestModifyBytes/noFooter_noHeader.txt + src/test/resources/TestModifyBytes/noHeader.txt + src/test/resources/TestModifyBytes/testFile.txt + src/test/resources/TestReplaceTextLineByLine/$1$1.txt + src/test/resources/TestReplaceTextLineByLine/BRue_cRue_RiRey.txt + src/test/resources/TestReplaceTextLineByLine/Blu$2e_clu$2e.txt + src/test/resources/TestReplaceTextLineByLine/D$d_h$d.txt + src/test/resources/TestReplaceTextLineByLine/Good.txt + src/test/resources/TestReplaceTextLineByLine/Spider.txt + src/test/resources/TestReplaceTextLineByLine/[DODO].txt + src/test/resources/TestReplaceTextLineByLine/cu[$1]_Po[$1].txt + src/test/resources/TestReplaceTextLineByLine/cu_Po.txt + src/test/resources/TestReplaceTextLineByLine/food.txt + src/test/resources/TestReplaceTextLineByLine/testFile.txt + src/test/resources/TestReplaceTextWithMapping/color-fruit-backreference-mapping.txt + src/test/resources/TestReplaceTextWithMapping/color-fruit-blank-mapping.txt + src/test/resources/TestReplaceTextWithMapping/color-fruit-escaped-dollar-mapping.txt + src/test/resources/TestReplaceTextWithMapping/color-fruit-excessive-backreference-mapping-simple.txt + src/test/resources/TestReplaceTextWithMapping/color-fruit-excessive-backreference-mapping.txt + src/test/resources/TestReplaceTextWithMapping/color-fruit-invalid-backreference-mapping.txt + src/test/resources/TestReplaceTextWithMapping/color-fruit-mapping.txt + src/test/resources/TestReplaceTextWithMapping/color-fruit-no-match-mapping.txt + src/test/resources/TestReplaceTextWithMapping/color-fruit-space-mapping.txt + src/test/resources/TestReplaceTextWithMapping/colors-without-dashes.txt + src/test/resources/TestReplaceTextWithMapping/colors.txt + src/test/resources/TestScanContent/helloWorld + src/test/resources/TestScanContent/wellthengood-bye + src/test/resources/TestSplitText/1.txt + src/test/resources/TestSplitText/2.txt + src/test/resources/TestSplitText/3.txt + src/test/resources/TestSplitText/4.txt + src/test/resources/TestSplitText/5.txt + src/test/resources/TestSplitText/6.txt + src/test/resources/TestSplitText/original.txt + src/test/resources/TestTransformXml/math.html + src/test/resources/TestTransformXml/tokens.csv + src/test/resources/TestTransformXml/tokens.xml + src/test/resources/TestUnpackContent/folder/cal.txt + src/test/resources/TestUnpackContent/folder/date.txt + src/test/resources/TestUnpackContent/data.flowfilev2 + src/test/resources/TestUnpackContent/data.flowfilev3 + src/test/resources/TestXml/xml-bundle-1 + src/test/resources/CompressedData/SampleFile.txt.bz2 + src/test/resources/CompressedData/SampleFile.txt.gz + src/test/resources/CompressedData/SampleFile1.txt.bz2 + src/test/resources/CompressedData/SampleFile1.txt.gz + src/test/resources/CompressedData/SampleFileConcat.txt.bz2 + src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar + src/test/resources/ExecuteCommand/TestSuccess.jar + src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar + src/test/resources/TestIdentifyMimeType/1.jar + src/test/resources/TestIdentifyMimeType/1.tar + src/test/resources/TestIdentifyMimeType/1.tar.gz + src/test/resources/TestIdentifyMimeType/1.txt.bz2 + src/test/resources/TestIdentifyMimeType/1.txt.gz + src/test/resources/TestIdentifyMimeType/1.zip + src/test/resources/TestIdentifyMimeType/flowfilev1.tar + src/test/resources/TestUnpackContent/data.tar + src/test/resources/TestUnpackContent/data.zip + + + + + diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java new file mode 100644 index 0000000000..bee1d39a77 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; + +@EventDriven +@Tags({ "sql", "select", "jdbc", "query", "database" }) +@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." + + " Streaming is used so arbitrarily large result sets are supported.") +public class ExecuteSQL extends AbstractProcessor { + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from SQL query result set.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship") + .build(); + private final Set relationships; + + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder() + .name("SQL select query") + .description("SQL select query") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder() + .name("Max Wait Time") + .description("The maximum amount of time allowed for a running SQL select query " + + " , zero means there is no limit. Max time less than 1 second will be equal to zero.") + .defaultValue("0 seconds") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .sensitive(false) + .build(); + + private final List propDescriptors; + + public ExecuteSQL() { + final Set r = new HashSet<>(); + r.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(r); + + final List pds = new ArrayList<>(); + pds.add(DBCP_SERVICE); + pds.add(SQL_SELECT_QUERY); + pds.add(QUERY_TIMEOUT); + propDescriptors = Collections.unmodifiableList(pds); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return propDescriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile incoming = session.get(); + if (incoming == null) { + return; + } + + final ProcessorLog logger = getLogger(); + + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(incoming).getValue(); + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + + final StopWatch stopWatch = new StopWatch(true); + + try (final Connection con = dbcpService.getConnection(); + final Statement st = con.createStatement()) { + st.setQueryTimeout(queryTimeout); // timeout in seconds + final LongHolder nrOfRows = new LongHolder(0L); + final FlowFile outgoing = session.write(incoming, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { + logger.debug("Executing query {}", new Object[] { selectQuery }); + final ResultSet resultSet = st.executeQuery(selectQuery); + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out)); + } catch (final SQLException e) { + throw new ProcessException(e); + } + } + }); + + logger.info("{} contains {} Avro records", new Object[] { nrOfRows.get() }); + logger.info("Transferred {} to 'success'", new Object[] { outgoing }); + session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(outgoing, REL_SUCCESS); + } catch (final ProcessException | SQLException e) { + logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] { selectQuery, incoming, e }); + session.transfer(incoming, REL_FAILURE); + } + } +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java new file mode 100644 index 0000000000..6fc69ff4bc --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import static java.sql.Types.BIGINT; +import static java.sql.Types.BOOLEAN; +import static java.sql.Types.CHAR; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.FLOAT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NUMERIC; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.REAL; +import static java.sql.Types.ROWID; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARCHAR; + +import java.io.IOException; +import java.io.OutputStream; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.SchemaBuilder.FieldAssembler; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; + +/** + * JDBC / SQL common functions. + */ +public class JdbcCommon { + + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException { + final Schema schema = createSchema(rs); + final GenericRecord rec = new GenericData.Record(schema); + + final DatumWriter datumWriter = new GenericDatumWriter(schema); + try (final DataFileWriter dataFileWriter = new DataFileWriter(datumWriter)) { + dataFileWriter.create(schema, outStream); + + final ResultSetMetaData meta = rs.getMetaData(); + final int nrOfColumns = meta.getColumnCount(); + long nrOfRows = 0; + while (rs.next()) { + for (int i = 1; i <= nrOfColumns; i++) { + final Object value = rs.getObject(i); + + // The different types that we support are numbers (int, long, double, float), + // as well as boolean values and Strings. Since Avro doesn't provide + // timestamp types, we want to convert those to Strings. So we will cast anything other + // than numbers or booleans to strings by using to toString() method. + if (value == null) { + rec.put(i - 1, null); + } else if (value instanceof Number || value instanceof Boolean) { + rec.put(i - 1, value); + } else { + rec.put(i - 1, value.toString()); + } + } + dataFileWriter.append(rec); + nrOfRows += 1; + } + + return nrOfRows; + } + } + + public static Schema createSchema(final ResultSet rs) throws SQLException { + final ResultSetMetaData meta = rs.getMetaData(); + final int nrOfColumns = meta.getColumnCount(); + final String tableName = meta.getTableName(1); + + final FieldAssembler builder = SchemaBuilder.record(tableName).namespace("any.data").fields(); + + /** + * Some missing Avro types - Decimal, Date types. May need some additional work. + */ + for (int i = 1; i <= nrOfColumns; i++) { + switch (meta.getColumnType(i)) { + case CHAR: + case LONGNVARCHAR: + case LONGVARCHAR: + case NCHAR: + case NVARCHAR: + case VARCHAR: + builder.name(meta.getColumnName(i)).type().stringType().noDefault(); + break; + + case BOOLEAN: + builder.name(meta.getColumnName(i)).type().booleanType().noDefault(); + break; + + case INTEGER: + case SMALLINT: + case TINYINT: + builder.name(meta.getColumnName(i)).type().intType().noDefault(); + break; + + case BIGINT: + builder.name(meta.getColumnName(i)).type().longType().noDefault(); + break; + + // java.sql.RowId is interface, is seems to be database + // implementation specific, let's convert to String + case ROWID: + builder.name(meta.getColumnName(i)).type().stringType().noDefault(); + break; + + case FLOAT: + case REAL: + builder.name(meta.getColumnName(i)).type().floatType().noDefault(); + break; + + case DOUBLE: + builder.name(meta.getColumnName(i)).type().doubleType().noDefault(); + break; + + // Did not find direct suitable type, need to be clarified!!!! + case DECIMAL: + case NUMERIC: + builder.name(meta.getColumnName(i)).type().stringType().noDefault(); + break; + + // Did not find direct suitable type, need to be clarified!!!! + case DATE: + case TIME: + case TIMESTAMP: + builder.name(meta.getColumnName(i)).type().stringType().noDefault(); + break; + + default: + break; + } + } + + return builder.endRecord(); + } +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 17339bc8a2..e62b57ff1f 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -66,3 +66,4 @@ org.apache.nifi.processors.standard.SplitXml org.apache.nifi.processors.standard.TransformXml org.apache.nifi.processors.standard.UnpackContent org.apache.nifi.processors.standard.ValidateXml +org.apache.nifi.processors.standard.ExecuteSQL diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java new file mode 100644 index 0000000000..efa2705ef1 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.standard.util.TestJdbcHugeStream; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.fusesource.hawtbuf.ByteArrayInputStream; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestExecuteSQL { + + private static Logger LOGGER; + + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ExecuteSQL", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestExecuteSQL", "debug"); + LOGGER = LoggerFactory.getLogger(TestExecuteSQL.class); + } + + final static String DB_LOCATION = "target/db"; + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + @Test + public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException { + invokeOnTrigger(null); + } + + @Test + public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException { + // Does to seem to have any effect when using embedded Derby + invokeOnTrigger(1); // 1 second max time + } + + public void invokeOnTrigger(final Integer queryTimeout) throws InitializationException, ClassNotFoundException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class); + + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map dbcpProperties = new HashMap<>(); + + runner.addControllerService("dbcp", dbcp, dbcpProperties); + + runner.enableControllerService(dbcp); + runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp"); + + if (queryTimeout != null) { + runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs"); + } + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = dbcp.getConnection(); + TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000); + LOGGER.info("test data loaded"); + + // ResultSet size will be 1x2000x1000 = 2 000 000 rows + // because of where PER.ID = ${person.id} + final int nrOfRows = 2000000; + final String query = "select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + + ", ROW_NUMBER() OVER () as rownr " + + " from persons PER, products PRD, relationships REL" + + " where PER.ID = ${person.id}"; + + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query); + + // incoming FlowFile content is not used, but attributes are used + final Map attributes = new HashMap(); + attributes.put("person.id", "10"); + runner.enqueue("Hello".getBytes(), attributes); + + runner.run(); + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); + + // read all Avro records and verify created FlowFile contains 1000000 + // records + final List flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS); + final InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray()); + final DatumReader datumReader = new GenericDatumReader(); + final DataFileStream dataFileReader = new DataFileStream(in, datumReader); + GenericRecord record = null; + long recordsFromStream = 0; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with + // many items. + record = dataFileReader.next(record); + recordsFromStream += 1; + } + + LOGGER.info("total nr of records from stream: " + recordsFromStream); + assertEquals(nrOfRows, recordsFromStream); + dataFileReader.close(); + } + + /** + * Simple implementation only for ExecuteSQL processor testing. + * + */ + class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + return con; + } catch (final Exception e) { + throw new ProcessException("getConnection failed: " + e); + } + } + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java new file mode 100644 index 0000000000..f54d4bacc3 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestJdbcCommon { + + final static String DB_LOCATION = "target/db"; + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + String createTable = "create table restaurants(id integer, name varchar(20), city varchar(50))"; + String dropTable = "drop table restaurants"; + + @Test + public void testCreateSchema() throws ClassNotFoundException, SQLException { + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + final Connection con = createConnection(); + final Statement st = con.createStatement(); + + try { + st.executeUpdate(dropTable); + } catch (final Exception e) { + // table may not exist, this is not serious problem. + } + + st.executeUpdate(createTable); + st.executeUpdate("insert into restaurants values (1, 'Irifunes', 'San Mateo')"); + st.executeUpdate("insert into restaurants values (2, 'Estradas', 'Daly City')"); + st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')"); + + final ResultSet resultSet = st.executeQuery("select * from restaurants"); + + final Schema schema = JdbcCommon.createSchema(resultSet); + assertNotNull(schema); + + // records name, should be result set first column table name + // Notice! sql select may join data from different tables, other columns + // may have different table names + assertEquals("RESTAURANTS", schema.getName()); + assertNotNull(schema.getField("ID")); + assertNotNull(schema.getField("NAME")); + assertNotNull(schema.getField("CITY")); + + st.close(); + con.close(); + } + + @Test + public void testConvertToBytes() throws ClassNotFoundException, SQLException, IOException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + final Connection con = createConnection(); + final Statement st = con.createStatement(); + + try { + st.executeUpdate(dropTable); + } catch (final Exception e) { + // table may not exist, this is not serious problem. + } + + st.executeUpdate(createTable); + + st.executeUpdate("insert into restaurants values (1, 'Irifunes', 'San Mateo')"); + st.executeUpdate("insert into restaurants values (2, 'Estradas', 'Daly City')"); + st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')"); + + final ResultSet resultSet = st.executeQuery("select R.*, ROW_NUMBER() OVER () as rownr from restaurants R"); + + final ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + JdbcCommon.convertToAvroStream(resultSet, outStream); + + final byte[] serializedBytes = outStream.toByteArray(); + assertNotNull(serializedBytes); + System.out.println("Avro serialized result size in bytes: " + serializedBytes.length); + + st.close(); + con.close(); + + // Deserialize bytes to records + + final InputStream instream = new ByteArrayInputStream(serializedBytes); + + final DatumReader datumReader = new GenericDatumReader(); + try (final DataFileStream dataFileReader = new DataFileStream(instream, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with + // many items. + record = dataFileReader.next(record); + System.out.println(record); + } + } + } + + // many test use Derby as database, so ensure driver is available + @Test + public void testDriverLoad() throws ClassNotFoundException { + final Class clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + assertNotNull(clazz); + } + + private Connection createConnection() throws ClassNotFoundException, SQLException { + + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + return con; + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java new file mode 100644 index 0000000000..654e6c8e80 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Random; + +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Test streaming using large number of result set rows. 1. Read data from + * database. 2. Create Avro schema from ResultSet meta data. 3. Read rows from + * ResultSet and write rows to Avro writer stream (Avro will create record for + * each row). 4. And finally read records from Avro stream to verify all data is + * present in Avro stream. + * + * + * Sql query will return all combinations from 3 table. For example when each + * table contain 1000 rows, result set will be 1 000 000 000 rows. + * + */ +public class TestJdbcHugeStream { + + final static String DB_LOCATION = "target/db"; + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + /** + * In case of large record set this will fail with + * java.lang.OutOfMemoryError: Java heap space at + * java.util.Arrays.copyOf(Arrays.java:2271) at + * java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at + * java + * .io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) + * at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at + * org .apache.avro.file. + * DataFileWriter$BufferedFileOutputStream$PositionFilter + * .write(DataFileWriter.java:446) + * + */ + @Test + @Ignore + public void readSend2StreamHuge_InMemory() throws ClassNotFoundException, SQLException, IOException { + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + try (final Connection con = createConnection()) { + loadTestData2Database(con, 150, 150, 150); + System.out.println("test data loaded"); + + try (final Statement st = con.createStatement()) { + // Notice! + // Following select is deliberately invalid! + // For testing we need huge amount of rows, so where part is not + // used. + final ResultSet resultSet = st.executeQuery("select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + + ", ROW_NUMBER() OVER () as rownr " + + " from persons PER, products PRD, relationships REL"); + + final ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + final long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream); + System.out.println("total nr of rows in resultset: " + nrOfRows); + + final byte[] serializedBytes = outStream.toByteArray(); + assertNotNull(serializedBytes); + System.out.println("Avro serialized result size in bytes: " + serializedBytes.length); + + // Deserialize bytes to records + + final InputStream instream = new ByteArrayInputStream(serializedBytes); + + final DatumReader datumReader = new GenericDatumReader(); + try (final DataFileStream dataFileReader = new DataFileStream(instream, datumReader)) { + GenericRecord record = null; + long recordsFromStream = 0; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This + // saves us from + // allocating and garbage collecting many objects for + // files with many items. + record = dataFileReader.next(record); + recordsFromStream += 1; + } + System.out.println("total nr of records from stream: " + recordsFromStream); + assertEquals(nrOfRows, recordsFromStream); + } + } + } + } + + @Test + public void readSend2StreamHuge_FileBased() throws ClassNotFoundException, SQLException, IOException { + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + try (final Connection con = createConnection()) { + loadTestData2Database(con, 300, 300, 300); + + try (final Statement st = con.createStatement()) { + // Notice! + // Following select is deliberately invalid! + // For testing we need huge amount of rows, so where part is not + // used. + final ResultSet resultSet = st.executeQuery("select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + + ", ROW_NUMBER() OVER () as rownr " + + " from persons PER, products PRD, relationships REL"); + + final OutputStream outStream = new FileOutputStream("target/data.avro"); + final long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream); + + // Deserialize bytes to records + final InputStream instream = new FileInputStream("target/data.avro"); + + final DatumReader datumReader = new GenericDatumReader(); + try (final DataFileStream dataFileReader = new DataFileStream(instream, datumReader)) { + GenericRecord record = null; + long recordsFromStream = 0; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This + // saves us from + // allocating and garbage collecting many objects for + // files with many items. + record = dataFileReader.next(record); + recordsFromStream += 1; + } + System.out.println("total nr of records from stream: " + recordsFromStream); + assertEquals(nrOfRows, recordsFromStream); + } + } + } + } + + // ================================================ helpers + // =============================================== + + static String dropPersons = "drop table persons"; + static String dropProducts = "drop table products"; + static String dropRelationships = "drop table relationships"; + static String createPersons = "create table persons (id integer, name varchar(100), code integer)"; + static String createProducts = "create table products (id integer, name varchar(100), code integer)"; + static String createRelationships = "create table relationships (id integer,name varchar(100), code integer)"; + + static public void loadTestData2Database(Connection con, int nrOfPersons, int nrOfProducts, int nrOfRels) throws ClassNotFoundException, SQLException { + + System.out.println(createRandomName()); + System.out.println(createRandomName()); + System.out.println(createRandomName()); + + final Statement st = con.createStatement(); + + // tables may not exist, this is not serious problem. + try { + st.executeUpdate(dropPersons); + } catch (final Exception e) { + } + + try { + st.executeUpdate(dropProducts); + } catch (final Exception e) { + } + + try { + st.executeUpdate(dropRelationships); + } catch (final Exception e) { + } + + st.executeUpdate(createPersons); + st.executeUpdate(createProducts); + st.executeUpdate(createRelationships); + + for (int i = 0; i < nrOfPersons; i++) + loadPersons(st, i); + + for (int i = 0; i < nrOfProducts; i++) + loadProducts(st, i); + + for (int i = 0; i < nrOfRels; i++) + loadRelationships(st, i); + + st.close(); + } + + static Random rng = new Random(53495); + + static private void loadPersons(Statement st, int nr) throws SQLException { + st.executeUpdate("insert into persons values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")"); + } + + static private void loadProducts(Statement st, int nr) throws SQLException { + st.executeUpdate("insert into products values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")"); + } + + static private void loadRelationships(Statement st, int nr) throws SQLException { + st.executeUpdate("insert into relationships values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")"); + } + + static private String createRandomName() { + return createRandomString() + " " + createRandomString(); + } + + static private String createRandomString() { + + final int length = rng.nextInt(19); + final String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + final char[] text = new char[length]; + for (int i = 0; i < length; i++) { + text[i] = characters.charAt(rng.nextInt(characters.length())); + } + return new String(text); + } + + private Connection createConnection() throws ClassNotFoundException, SQLException { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + return con; + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java index ef42be3289..9290e0d7fe 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java @@ -41,84 +41,59 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; /** - * Implementation of for Database Connection Pooling Service. - * Apache DBCP is used for connection pooling functionality. + * Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality. * */ -@Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"}) +@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" }) @CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage.") public class DBCPConnectionPool extends AbstractControllerService implements DBCPService { - public static final DatabaseSystemDescriptor DEFAULT_DATABASE_SYSTEM = DatabaseSystems.getDescriptor("JavaDB"); - - public static final PropertyDescriptor DATABASE_SYSTEM = new PropertyDescriptor.Builder() - .name("Database Type") - .description("Database management system") - .allowableValues(DatabaseSystems.knownDatabaseSystems) - .defaultValue(DEFAULT_DATABASE_SYSTEM.getValue()) - .required(true) - .build(); - - public static final PropertyDescriptor DB_HOST = new PropertyDescriptor.Builder() - .name("Database Host") - .description("Database Host") - .defaultValue(null) - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor DB_PORT = new PropertyDescriptor.Builder() - .name("Database Port") - .description("Database server port") - .required(true) - .addValidator(StandardValidators.PORT_VALIDATOR) - .build(); + public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder() + .name("Database Connection URL") + .description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters." + + " The exact syntax of a database connection URL is specified by your DBMS.") + .defaultValue(null) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder() - .name("Database Driver Class Name") - .description("Database driver class name") - .defaultValue(DEFAULT_DATABASE_SYSTEM.driverClassName) - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Database Driver Class Name") + .description("Database driver class name") + .defaultValue(null) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor DB_DRIVER_JAR_URL = new PropertyDescriptor.Builder() - .name("Database Driver Jar Url") - .description("Optional database driver jar file path url. For example 'file:///var/tmp/mariadb-java-client-1.1.7.jar'") - .defaultValue(null) - .required(false) - .addValidator(StandardValidators.URL_VALIDATOR) - .build(); - - public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder() - .name("Database Name") - .description("Database name") - .defaultValue(null) - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Database Driver Jar Url") + .description("Optional database driver jar file path url. For example 'file:///var/tmp/mariadb-java-client-1.1.7.jar'") + .defaultValue(null) + .required(false) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder() - .name("Database User") - .description("Database user name") - .defaultValue(null) - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Database User") + .description("Database user name") + .defaultValue(null) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder() - .name("Password") - .description("The password for the database user") - .defaultValue(null) - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(true) - .build(); + .name("Password") + .description("The password for the database user") + .defaultValue(null) + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder() - .name("Max Wait Time") - .description("The maximum amount of time that the pool will wait (when there are no available connections) " - + " for a connection to be returned before failing, or -1 to wait indefinitely. ") + .name("Max Wait Time") + .description("The maximum amount of time that the pool will wait (when there are no available connections) " + + " for a connection to be returned before failing, or -1 to wait indefinitely. ") .defaultValue("500 millis") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) @@ -126,9 +101,9 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC .build(); public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder() - .name("Max Total Connections") - .description("The maximum number of active connections that can be allocated from this pool at the same time, " - + " or negative for no limit.") + .name("Max Total Connections") + .description("The maximum number of active connections that can be allocated from this pool at the same time, " + + " or negative for no limit.") .defaultValue("8") .required(true) .addValidator(StandardValidators.INTEGER_VALIDATOR) @@ -138,13 +113,10 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC private static final List properties; static { - List props = new ArrayList<>(); - props.add(DATABASE_SYSTEM); - props.add(DB_HOST); - props.add(DB_PORT); + final List props = new ArrayList<>(); + props.add(DATABASE_URL); props.add(DB_DRIVERNAME); props.add(DB_DRIVER_JAR_URL); - props.add(DB_NAME); props.add(DB_USER); props.add(DB_PASSWORD); props.add(MAX_WAIT_TIME); @@ -162,30 +134,29 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC /** * Create new pool, open some connections ready to be used - * @param context the configuration context - * @throws InitializationException if unable to create a database connection + * + * @param context + * the configuration context + * @throws InitializationException + * if unable to create a database connection */ @OnEnabled public void onConfigured(final ConfigurationContext context) throws InitializationException { - DatabaseSystemDescriptor dbsystem = DatabaseSystems.getDescriptor( context.getProperty(DATABASE_SYSTEM).getValue() ); - String host = context.getProperty(DB_HOST).getValue(); - Integer port = context.getProperty(DB_PORT).asInteger(); - String drv = context.getProperty(DB_DRIVERNAME).getValue(); - String dbname = context.getProperty(DB_NAME).getValue(); - String user = context.getProperty(DB_USER).getValue(); - String passw = context.getProperty(DB_PASSWORD).getValue(); - Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS); - Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger(); + final String drv = context.getProperty(DB_DRIVERNAME).getValue(); + final String user = context.getProperty(DB_USER).getValue(); + final String passw = context.getProperty(DB_PASSWORD).getValue(); + final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS); + final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger(); dataSource = new BasicDataSource(); dataSource.setDriverClassName(drv); // Optional driver URL, when exist, this URL will be used to locate driver jar file location - String urlString = context.getProperty(DB_DRIVER_JAR_URL).getValue(); - dataSource.setDriverClassLoader( getDriverClassLoader(urlString, drv) ); + final String urlString = context.getProperty(DB_DRIVER_JAR_URL).getValue(); + dataSource.setDriverClassLoader(getDriverClassLoader(urlString, drv)); - String dburl = dbsystem.buildUrl(host, port, dbname); + final String dburl = context.getProperty(DATABASE_URL).getValue(); dataSource.setMaxWait(maxWaitMillis); dataSource.setMaxActive(maxTotal); @@ -196,40 +167,41 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC // verify connection can be established. try { - Connection con = dataSource.getConnection(); - if (con==null) { + final Connection con = dataSource.getConnection(); + if (con == null) { throw new InitializationException("Connection to database cannot be established."); } con.close(); - } catch (SQLException e) { + } catch (final SQLException e) { throw new InitializationException(e); } } /** - * using Thread.currentThread().getContextClassLoader(); - * will ensure that you are using the ClassLoader for you NAR. - * @throws InitializationException if there is a problem obtaining the ClassLoader + * using Thread.currentThread().getContextClassLoader(); will ensure that you are using the ClassLoader for you NAR. + * + * @throws InitializationException + * if there is a problem obtaining the ClassLoader */ protected ClassLoader getDriverClassLoader(String urlString, String drvName) throws InitializationException { - if (urlString!=null && urlString.length()>0) { + if (urlString != null && urlString.length() > 0) { try { - URL[] urls = new URL[] { new URL(urlString) }; - URLClassLoader ucl = new URLClassLoader(urls); + final URL[] urls = new URL[] { new URL(urlString) }; + final URLClassLoader ucl = new URLClassLoader(urls); // Workaround which allows to use URLClassLoader for JDBC driver loading. // (Because the DriverManager will refuse to use a driver not loaded by the system ClassLoader.) - Class clazz = Class.forName(drvName, true, ucl); - if (clazz==null) { + final Class clazz = Class.forName(drvName, true, ucl); + if (clazz == null) { throw new InitializationException("Can't load Database Driver " + drvName); } - Driver driver = (Driver) clazz.newInstance(); - DriverManager.registerDriver( new DriverShim(driver) ); + final Driver driver = (Driver) clazz.newInstance(); + DriverManager.registerDriver(new DriverShim(driver)); return ucl; - } catch (MalformedURLException e) { + } catch (final MalformedURLException e) { throw new InitializationException("Invalid Database Driver Jar Url", e); - } catch (Exception e) { + } catch (final Exception e) { throw new InitializationException("Can't load Database Driver", e); } } else { @@ -239,24 +211,23 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC } /** - * Shutdown pool, close all open connections. + * Shutdown pool, close all open connections. */ @OnDisabled public void shutdown() { try { dataSource.close(); - } catch (SQLException e) { + } catch (final SQLException e) { throw new ProcessException(e); } } - @Override public Connection getConnection() throws ProcessException { try { - Connection con = dataSource.getConnection(); + final Connection con = dataSource.getConnection(); return con; - } catch (SQLException e) { + } catch (final SQLException e) { throw new ProcessException(e); } } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DatabaseSystemDescriptor.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DatabaseSystemDescriptor.java index fd45421f84..00e30cae6f 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DatabaseSystemDescriptor.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DatabaseSystemDescriptor.java @@ -24,6 +24,7 @@ import org.apache.nifi.components.AllowableValue; * An immutable object for holding information about a database system. * */ +@Deprecated public class DatabaseSystemDescriptor extends AllowableValue { public final String driverClassName; diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DatabaseSystems.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DatabaseSystems.java index 2d21cd7216..943f6d2fcb 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DatabaseSystems.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DatabaseSystems.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.dbcp; +@Deprecated public class DatabaseSystems { /** diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java index 541df8f606..6683d2a1f1 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.dbcp; -import static org.apache.nifi.dbcp.DatabaseSystems.getDescriptor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -53,23 +52,8 @@ public class DBCPServiceTest { System.setProperty("derby.stream.error.file", "target/derby.log"); } - /** - * Unknown database system. - * - */ - @Test - public void testUnknownDatabaseSystem() throws InitializationException { - final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); - final DBCPConnectionPool service = new DBCPConnectionPool(); - final Map properties = new HashMap(); - properties.put(DBCPConnectionPool.DATABASE_SYSTEM.getName(), "garbage"); - runner.addControllerService("test-bad2", service, properties); - runner.assertNotValid(service); - } - - /** - * Missing property values. + * Missing property values. */ @Test public void testMissingPropertyValues() throws InitializationException { @@ -81,8 +65,7 @@ public class DBCPServiceTest { } /** - * Test database connection using Derby. - * Connect, create table, insert, select, drop table. + * Test database connection using Derby. Connect, create table, insert, select, drop table. * */ @Test @@ -92,25 +75,21 @@ public class DBCPServiceTest { runner.addControllerService("test-good1", service); // remove previous test database, if any - File dbLocation = new File(DB_LOCATION); + final File dbLocation = new File(DB_LOCATION); dbLocation.delete(); - // Should setProperty call also generate DBCPConnectionPool.onPropertyModified() method call? - // It does not currently. - - // Some properties already should have JavaDB/Derby default values, let's set only missing values. - runner.setProperty(service, DBCPConnectionPool.DB_HOST, "NA"); // Embedded Derby don't use host - runner.setProperty(service, DBCPConnectionPool.DB_PORT, "1"); // Embedded Derby don't use port, but must have value anyway - runner.setProperty(service, DBCPConnectionPool.DB_NAME, DB_LOCATION); - runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester"); + // set embedded Derby database connection url + runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true"); + runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester"); runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp"); + runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver"); runner.enableControllerService(service); runner.assertValid(service); - DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-good1"); + final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-good1"); Assert.assertNotNull(dbcpService); - Connection connection = dbcpService.getConnection(); + final Connection connection = dbcpService.getConnection(); Assert.assertNotNull(connection); createInsertSelectDrop(connection); @@ -119,12 +98,9 @@ public class DBCPServiceTest { } /** - * NB!!!! - * Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar - * Prerequisite: access to running MariaDb database server + * NB!!!! Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar Prerequisite: access to running MariaDb database server * - * Test database connection using external JDBC jar located by URL. - * Connect, create table, insert, select, drop table. + * Test database connection using external JDBC jar located by URL. Connect, create table, insert, select, drop table. * */ @Ignore @@ -134,42 +110,32 @@ public class DBCPServiceTest { final DBCPConnectionPool service = new DBCPConnectionPool(); runner.addControllerService("test-external-jar", service); - DatabaseSystemDescriptor mariaDb = getDescriptor("MariaDB"); - assertNotNull(mariaDb); - - // Set MariaDB properties values. - runner.setProperty(service, DBCPConnectionPool.DATABASE_SYSTEM, mariaDb.getValue()); - runner.setProperty(service, DBCPConnectionPool.DB_PORT, mariaDb.defaultPort.toString()); - runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, mariaDb.driverClassName); + // set MariaDB database connection url + runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:mariadb://localhost:3306/" + "testdb"); + runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.mariadb.jdbc.Driver"); runner.setProperty(service, DBCPConnectionPool.DB_DRIVER_JAR_URL, "file:///var/tmp/mariadb-java-client-1.1.7.jar"); - - runner.setProperty(service, DBCPConnectionPool.DB_HOST, "localhost"); // localhost - runner.setProperty(service, DBCPConnectionPool.DB_NAME, "testdb"); - runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester"); + runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester"); runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp"); runner.enableControllerService(service); runner.assertValid(service); - DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-external-jar"); + final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-external-jar"); Assert.assertNotNull(dbcpService); - Connection connection = dbcpService.getConnection(); + final Connection connection = dbcpService.getConnection(); Assert.assertNotNull(connection); createInsertSelectDrop(connection); - connection.close(); // return to pool + connection.close(); // return to pool } - @Rule public ExpectedException exception = ExpectedException.none(); /** - * Test get database connection using Derby. - * Get many times, after a while pool should not contain any available connection - * and getConnection should fail. + * Test get database connection using Derby. Get many times, after a while pool should not contain any available connection and getConnection should fail. */ @Test public void testExhaustPool() throws InitializationException, SQLException { @@ -178,33 +144,30 @@ public class DBCPServiceTest { runner.addControllerService("test-exhaust", service); // remove previous test database, if any - File dbLocation = new File(DB_LOCATION); + final File dbLocation = new File(DB_LOCATION); dbLocation.delete(); - runner.setProperty(service, DBCPConnectionPool.DB_HOST, "NA"); // Embedded Derby don't use host - runner.setProperty(service, DBCPConnectionPool.DB_PORT, "1"); // Embedded Derby don't use port, but must have value anyway - runner.setProperty(service, DBCPConnectionPool.DB_NAME, DB_LOCATION); + // set embedded Derby database connection url + runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true"); runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester"); - runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp"); + runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver"); runner.enableControllerService(service); runner.assertValid(service); - DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust"); + final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust"); Assert.assertNotNull(dbcpService); exception.expect(ProcessException.class); exception.expectMessage("Cannot get a connection, pool error Timeout waiting for idle object"); for (int i = 0; i < 100; i++) { - Connection connection = dbcpService.getConnection(); + final Connection connection = dbcpService.getConnection(); Assert.assertNotNull(connection); } } /** - * Test get database connection using Derby. - * Get many times, release immediately - * and getConnection should not fail. + * Test get database connection using Derby. Get many times, release immediately and getConnection should not fail. */ @Test public void testGetManyNormal() throws InitializationException, SQLException { @@ -213,103 +176,98 @@ public class DBCPServiceTest { runner.addControllerService("test-exhaust", service); // remove previous test database, if any - File dbLocation = new File(DB_LOCATION); + final File dbLocation = new File(DB_LOCATION); dbLocation.delete(); - runner.setProperty(service, DBCPConnectionPool.DB_HOST, "NA"); // Embedded Derby don't use host - runner.setProperty(service, DBCPConnectionPool.DB_PORT, "1"); // Embedded Derby don't use port, but must have value anyway - runner.setProperty(service, DBCPConnectionPool.DB_NAME, DB_LOCATION); + // set embedded Derby database connection url + runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true"); runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester"); runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp"); + runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver"); runner.enableControllerService(service); runner.assertValid(service); - DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust"); + final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust"); Assert.assertNotNull(dbcpService); for (int i = 0; i < 1000; i++) { - Connection connection = dbcpService.getConnection(); + final Connection connection = dbcpService.getConnection(); Assert.assertNotNull(connection); - connection.close(); // will return connection to pool + connection.close(); // will return connection to pool } } - @Test public void testDriverLoad() throws ClassNotFoundException { - Class clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Class clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); assertNotNull(clazz); } /** - * NB!!!! - * Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar + * NB!!!! Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar */ @Test @Ignore("Intended only for local testing, not automated testing") public void testURLClassLoader() throws ClassNotFoundException, MalformedURLException, SQLException, InstantiationException, IllegalAccessException { - URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar"); - URL[] urls = new URL[] { url }; + final URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar"); + final URL[] urls = new URL[] { url }; - ClassLoader parent = Thread.currentThread().getContextClassLoader(); - URLClassLoader ucl = new URLClassLoader(urls,parent); + final ClassLoader parent = Thread.currentThread().getContextClassLoader(); + final URLClassLoader ucl = new URLClassLoader(urls, parent); - Class clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl); + final Class clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl); assertNotNull(clazz); - Driver driver = (Driver) clazz.newInstance(); - Driver shim = new DriverShim(driver); - DriverManager.registerDriver( shim ); + final Driver driver = (Driver) clazz.newInstance(); + final Driver shim = new DriverShim(driver); + DriverManager.registerDriver(shim); - Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb"); + final Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb"); assertNotNull(driver2); } /** - * NB!!!! - * Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar - * Prerequisite: access to running MariaDb database server + * NB!!!! Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar Prerequisite: access to running MariaDb database server */ @Test @Ignore("Intended only for local testing, not automated testing") public void testURLClassLoaderGetConnection() throws ClassNotFoundException, MalformedURLException, SQLException, InstantiationException, IllegalAccessException { - URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar"); - URL[] urls = new URL[] { url }; + final URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar"); + final URL[] urls = new URL[] { url }; - ClassLoader parent = Thread.currentThread().getContextClassLoader(); - URLClassLoader ucl = new URLClassLoader(urls,parent); + final ClassLoader parent = Thread.currentThread().getContextClassLoader(); + final URLClassLoader ucl = new URLClassLoader(urls, parent); - Class clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl); + final Class clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl); assertNotNull(clazz); - Driver driver = (Driver) clazz.newInstance(); - Driver shim = new DriverShim(driver); - DriverManager.registerDriver( shim ); + final Driver driver = (Driver) clazz.newInstance(); + final Driver shim = new DriverShim(driver); + DriverManager.registerDriver(shim); - Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb"); + final Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb"); assertNotNull(driver2); - Connection connection = DriverManager.getConnection("jdbc:mariadb://localhost:3306/testdb","tester","testerp"); + final Connection connection = DriverManager.getConnection("jdbc:mariadb://localhost:3306/testdb", "tester", "testerp"); assertNotNull(connection); connection.close(); DriverManager.deregisterDriver(shim); } - String createTable = "create table restaurants(id integer, name varchar(20), city varchar(50))"; String dropTable = "drop table restaurants"; - protected void createInsertSelectDrop( Connection con) throws SQLException { + protected void createInsertSelectDrop(Connection con) throws SQLException { - Statement st = con.createStatement(); + final Statement st = con.createStatement(); try { st.executeUpdate(dropTable); - } catch (Exception e) { + } catch (final Exception e) { // table may not exist, this is not serious problem. } @@ -320,7 +278,7 @@ public class DBCPServiceTest { st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')"); int nrOfRows = 0; - ResultSet resultSet = st.executeQuery("select * from restaurants"); + final ResultSet resultSet = st.executeQuery("select * from restaurants"); while (resultSet.next()) nrOfRows++; assertEquals(3, nrOfRows); diff --git a/nifi/pom.xml b/nifi/pom.xml index 6cc766683e..0c71ba8ae5 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -847,11 +847,21 @@ nifi-dbcp-service 0.2.0-incubating-SNAPSHOT + + org.apache.nifi + nifi-dbcp-service-api + 0.2.0-incubating-SNAPSHOT + com.jayway.jsonpath json-path 2.0.0 + + org.apache.derby + derby + 10.11.1.1 +