Merge branch 'NIFI-626' into develop

This commit is contained in:
Mark Payne 2015-06-17 13:33:08 -04:00
commit 0af9c75d78
12 changed files with 1344 additions and 476 deletions

View File

@ -1,19 +1,16 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
@ -177,6 +174,22 @@
<artifactId>jackson-databind</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.11.1.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.StopWatch;
@EventDriven
@Tags({ "sql", "select", "jdbc", "query", "database" })
@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format."
+ " Streaming is used so arbitrarily large result sets are supported.")
public class ExecuteSQL extends AbstractProcessor {
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Successfully created FlowFile from SQL query result set.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
.build();
private final Set<Relationship> relationships;
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain connection to database")
.required(true)
.identifiesControllerService(DBCPService.class)
.build();
public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder()
.name("SQL select query")
.description("SQL select query")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time allowed for a running SQL select query "
+ " , zero means there is no limit. Max time less than 1 second will be equal to zero.")
.defaultValue("0 seconds")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.sensitive(false)
.build();
private final List<PropertyDescriptor> propDescriptors;
public ExecuteSQL() {
final Set<Relationship> r = new HashSet<>();
r.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(r);
final List<PropertyDescriptor> pds = new ArrayList<>();
pds.add(DBCP_SERVICE);
pds.add(SQL_SELECT_QUERY);
pds.add(QUERY_TIMEOUT);
propDescriptors = Collections.unmodifiableList(pds);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propDescriptors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile incoming = session.get();
if (incoming == null) {
return;
}
final ProcessorLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(incoming).getValue();
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final StopWatch stopWatch = new StopWatch(true);
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
st.setQueryTimeout(queryTimeout); // timeout in seconds
final LongHolder nrOfRows = new LongHolder(0L);
final FlowFile outgoing = session.write(incoming, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
logger.debug("Executing query {}", new Object[] { selectQuery });
final ResultSet resultSet = st.executeQuery(selectQuery);
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out));
} catch (final SQLException e) {
throw new ProcessException(e);
}
}
});
logger.info("{} contains {} Avro records", new Object[] { nrOfRows.get() });
logger.info("Transferred {} to 'success'", new Object[] { outgoing });
session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(outgoing, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] { selectQuery, incoming, e });
session.transfer(incoming, REL_FAILURE);
}
}
}

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import static java.sql.Types.BIGINT;
import static java.sql.Types.BOOLEAN;
import static java.sql.Types.CHAR;
import static java.sql.Types.DATE;
import static java.sql.Types.DECIMAL;
import static java.sql.Types.DOUBLE;
import static java.sql.Types.FLOAT;
import static java.sql.Types.INTEGER;
import static java.sql.Types.LONGNVARCHAR;
import static java.sql.Types.LONGVARCHAR;
import static java.sql.Types.NCHAR;
import static java.sql.Types.NUMERIC;
import static java.sql.Types.NVARCHAR;
import static java.sql.Types.REAL;
import static java.sql.Types.ROWID;
import static java.sql.Types.SMALLINT;
import static java.sql.Types.TIME;
import static java.sql.Types.TIMESTAMP;
import static java.sql.Types.TINYINT;
import static java.sql.Types.VARCHAR;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaBuilder.FieldAssembler;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
/**
* JDBC / SQL common functions.
*/
public class JdbcCommon {
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException {
final Schema schema = createSchema(rs);
final GenericRecord rec = new GenericData.Record(schema);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter)) {
dataFileWriter.create(schema, outStream);
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
long nrOfRows = 0;
while (rs.next()) {
for (int i = 1; i <= nrOfColumns; i++) {
final Object value = rs.getObject(i);
// The different types that we support are numbers (int, long, double, float),
// as well as boolean values and Strings. Since Avro doesn't provide
// timestamp types, we want to convert those to Strings. So we will cast anything other
// than numbers or booleans to strings by using to toString() method.
if (value == null) {
rec.put(i - 1, null);
} else if (value instanceof Number || value instanceof Boolean) {
rec.put(i - 1, value);
} else {
rec.put(i - 1, value.toString());
}
}
dataFileWriter.append(rec);
nrOfRows += 1;
}
return nrOfRows;
}
}
public static Schema createSchema(final ResultSet rs) throws SQLException {
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
final String tableName = meta.getTableName(1);
final FieldAssembler<Schema> builder = SchemaBuilder.record(tableName).namespace("any.data").fields();
/**
* Some missing Avro types - Decimal, Date types. May need some additional work.
*/
for (int i = 1; i <= nrOfColumns; i++) {
switch (meta.getColumnType(i)) {
case CHAR:
case LONGNVARCHAR:
case LONGVARCHAR:
case NCHAR:
case NVARCHAR:
case VARCHAR:
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
break;
case BOOLEAN:
builder.name(meta.getColumnName(i)).type().booleanType().noDefault();
break;
case INTEGER:
case SMALLINT:
case TINYINT:
builder.name(meta.getColumnName(i)).type().intType().noDefault();
break;
case BIGINT:
builder.name(meta.getColumnName(i)).type().longType().noDefault();
break;
// java.sql.RowId is interface, is seems to be database
// implementation specific, let's convert to String
case ROWID:
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
break;
case FLOAT:
case REAL:
builder.name(meta.getColumnName(i)).type().floatType().noDefault();
break;
case DOUBLE:
builder.name(meta.getColumnName(i)).type().doubleType().noDefault();
break;
// Did not find direct suitable type, need to be clarified!!!!
case DECIMAL:
case NUMERIC:
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
break;
// Did not find direct suitable type, need to be clarified!!!!
case DATE:
case TIME:
case TIMESTAMP:
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
break;
default:
break;
}
}
return builder.endRecord();
}
}

View File

@ -66,3 +66,4 @@ org.apache.nifi.processors.standard.SplitXml
org.apache.nifi.processors.standard.TransformXml
org.apache.nifi.processors.standard.UnpackContent
org.apache.nifi.processors.standard.ValidateXml
org.apache.nifi.processors.standard.ExecuteSQL

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.fusesource.hawtbuf.ByteArrayInputStream;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestExecuteSQL {
private static Logger LOGGER;
static {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ExecuteSQL", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestExecuteSQL", "debug");
LOGGER = LoggerFactory.getLogger(TestExecuteSQL.class);
}
final static String DB_LOCATION = "target/db";
@BeforeClass
public static void setup() {
System.setProperty("derby.stream.error.file", "target/derby.log");
}
@Test
public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(null);
}
@Test
public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException {
// Does to seem to have any effect when using embedded Derby
invokeOnTrigger(1); // 1 second max time
}
public void invokeOnTrigger(final Integer queryTimeout) throws InitializationException, ClassNotFoundException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class);
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp");
if (queryTimeout != null) {
runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs");
}
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// load test data to database
final Connection con = dbcp.getConnection();
TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000);
LOGGER.info("test data loaded");
// ResultSet size will be 1x2000x1000 = 2 000 000 rows
// because of where PER.ID = ${person.id}
final int nrOfRows = 2000000;
final String query = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL"
+ " where PER.ID = ${person.id}";
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
// incoming FlowFile content is not used, but attributes are used
final Map<String, String> attributes = new HashMap<String, String>();
attributes.put("person.id", "10");
runner.enqueue("Hello".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
// read all Avro records and verify created FlowFile contains 1000000
// records
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
final InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray());
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(in, datumReader);
GenericRecord record = null;
long recordsFromStream = 0;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
record = dataFileReader.next(record);
recordsFromStream += 1;
}
LOGGER.info("total nr of records from stream: " + recordsFromStream);
assertEquals(nrOfRows, recordsFromStream);
dataFileReader.close();
}
/**
* Simple implementation only for ExecuteSQL processor testing.
*
*/
class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
@Override
public String getIdentifier() {
return "dbcp";
}
@Override
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con;
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
}
}
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestJdbcCommon {
final static String DB_LOCATION = "target/db";
@BeforeClass
public static void setup() {
System.setProperty("derby.stream.error.file", "target/derby.log");
}
String createTable = "create table restaurants(id integer, name varchar(20), city varchar(50))";
String dropTable = "drop table restaurants";
@Test
public void testCreateSchema() throws ClassNotFoundException, SQLException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
final Connection con = createConnection();
final Statement st = con.createStatement();
try {
st.executeUpdate(dropTable);
} catch (final Exception e) {
// table may not exist, this is not serious problem.
}
st.executeUpdate(createTable);
st.executeUpdate("insert into restaurants values (1, 'Irifunes', 'San Mateo')");
st.executeUpdate("insert into restaurants values (2, 'Estradas', 'Daly City')");
st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')");
final ResultSet resultSet = st.executeQuery("select * from restaurants");
final Schema schema = JdbcCommon.createSchema(resultSet);
assertNotNull(schema);
// records name, should be result set first column table name
// Notice! sql select may join data from different tables, other columns
// may have different table names
assertEquals("RESTAURANTS", schema.getName());
assertNotNull(schema.getField("ID"));
assertNotNull(schema.getField("NAME"));
assertNotNull(schema.getField("CITY"));
st.close();
con.close();
}
@Test
public void testConvertToBytes() throws ClassNotFoundException, SQLException, IOException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
final Connection con = createConnection();
final Statement st = con.createStatement();
try {
st.executeUpdate(dropTable);
} catch (final Exception e) {
// table may not exist, this is not serious problem.
}
st.executeUpdate(createTable);
st.executeUpdate("insert into restaurants values (1, 'Irifunes', 'San Mateo')");
st.executeUpdate("insert into restaurants values (2, 'Estradas', 'Daly City')");
st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')");
final ResultSet resultSet = st.executeQuery("select R.*, ROW_NUMBER() OVER () as rownr from restaurants R");
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
JdbcCommon.convertToAvroStream(resultSet, outStream);
final byte[] serializedBytes = outStream.toByteArray();
assertNotNull(serializedBytes);
System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
st.close();
con.close();
// Deserialize bytes to records
final InputStream instream = new ByteArrayInputStream(serializedBytes);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
GenericRecord record = null;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
record = dataFileReader.next(record);
System.out.println(record);
}
}
}
// many test use Derby as database, so ensure driver is available
@Test
public void testDriverLoad() throws ClassNotFoundException {
final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
assertNotNull(clazz);
}
private Connection createConnection() throws ClassNotFoundException, SQLException {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con;
}
}

View File

@ -0,0 +1,267 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Random;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
/**
* Test streaming using large number of result set rows. 1. Read data from
* database. 2. Create Avro schema from ResultSet meta data. 3. Read rows from
* ResultSet and write rows to Avro writer stream (Avro will create record for
* each row). 4. And finally read records from Avro stream to verify all data is
* present in Avro stream.
*
*
* Sql query will return all combinations from 3 table. For example when each
* table contain 1000 rows, result set will be 1 000 000 000 rows.
*
*/
public class TestJdbcHugeStream {
final static String DB_LOCATION = "target/db";
@BeforeClass
public static void setup() {
System.setProperty("derby.stream.error.file", "target/derby.log");
}
/**
* In case of large record set this will fail with
* java.lang.OutOfMemoryError: Java heap space at
* java.util.Arrays.copyOf(Arrays.java:2271) at
* java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at
* java
* .io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
* at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at
* org .apache.avro.file.
* DataFileWriter$BufferedFileOutputStream$PositionFilter
* .write(DataFileWriter.java:446)
*
*/
@Test
@Ignore
public void readSend2StreamHuge_InMemory() throws ClassNotFoundException, SQLException, IOException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
try (final Connection con = createConnection()) {
loadTestData2Database(con, 150, 150, 150);
System.out.println("test data loaded");
try (final Statement st = con.createStatement()) {
// Notice!
// Following select is deliberately invalid!
// For testing we need huge amount of rows, so where part is not
// used.
final ResultSet resultSet = st.executeQuery("select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL");
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
final long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream);
System.out.println("total nr of rows in resultset: " + nrOfRows);
final byte[] serializedBytes = outStream.toByteArray();
assertNotNull(serializedBytes);
System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
// Deserialize bytes to records
final InputStream instream = new ByteArrayInputStream(serializedBytes);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
GenericRecord record = null;
long recordsFromStream = 0;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This
// saves us from
// allocating and garbage collecting many objects for
// files with many items.
record = dataFileReader.next(record);
recordsFromStream += 1;
}
System.out.println("total nr of records from stream: " + recordsFromStream);
assertEquals(nrOfRows, recordsFromStream);
}
}
}
}
@Test
public void readSend2StreamHuge_FileBased() throws ClassNotFoundException, SQLException, IOException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
try (final Connection con = createConnection()) {
loadTestData2Database(con, 300, 300, 300);
try (final Statement st = con.createStatement()) {
// Notice!
// Following select is deliberately invalid!
// For testing we need huge amount of rows, so where part is not
// used.
final ResultSet resultSet = st.executeQuery("select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL");
final OutputStream outStream = new FileOutputStream("target/data.avro");
final long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream);
// Deserialize bytes to records
final InputStream instream = new FileInputStream("target/data.avro");
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
GenericRecord record = null;
long recordsFromStream = 0;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This
// saves us from
// allocating and garbage collecting many objects for
// files with many items.
record = dataFileReader.next(record);
recordsFromStream += 1;
}
System.out.println("total nr of records from stream: " + recordsFromStream);
assertEquals(nrOfRows, recordsFromStream);
}
}
}
}
// ================================================ helpers
// ===============================================
static String dropPersons = "drop table persons";
static String dropProducts = "drop table products";
static String dropRelationships = "drop table relationships";
static String createPersons = "create table persons (id integer, name varchar(100), code integer)";
static String createProducts = "create table products (id integer, name varchar(100), code integer)";
static String createRelationships = "create table relationships (id integer,name varchar(100), code integer)";
static public void loadTestData2Database(Connection con, int nrOfPersons, int nrOfProducts, int nrOfRels) throws ClassNotFoundException, SQLException {
System.out.println(createRandomName());
System.out.println(createRandomName());
System.out.println(createRandomName());
final Statement st = con.createStatement();
// tables may not exist, this is not serious problem.
try {
st.executeUpdate(dropPersons);
} catch (final Exception e) {
}
try {
st.executeUpdate(dropProducts);
} catch (final Exception e) {
}
try {
st.executeUpdate(dropRelationships);
} catch (final Exception e) {
}
st.executeUpdate(createPersons);
st.executeUpdate(createProducts);
st.executeUpdate(createRelationships);
for (int i = 0; i < nrOfPersons; i++)
loadPersons(st, i);
for (int i = 0; i < nrOfProducts; i++)
loadProducts(st, i);
for (int i = 0; i < nrOfRels; i++)
loadRelationships(st, i);
st.close();
}
static Random rng = new Random(53495);
static private void loadPersons(Statement st, int nr) throws SQLException {
st.executeUpdate("insert into persons values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
}
static private void loadProducts(Statement st, int nr) throws SQLException {
st.executeUpdate("insert into products values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
}
static private void loadRelationships(Statement st, int nr) throws SQLException {
st.executeUpdate("insert into relationships values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
}
static private String createRandomName() {
return createRandomString() + " " + createRandomString();
}
static private String createRandomString() {
final int length = rng.nextInt(19);
final String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
final char[] text = new char[length];
for (int i = 0; i < length; i++) {
text[i] = characters.charAt(rng.nextInt(characters.length()));
}
return new String(text);
}
private Connection createConnection() throws ClassNotFoundException, SQLException {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con;
}
}

View File

@ -41,43 +41,26 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
/**
* Implementation of for Database Connection Pooling Service.
* Apache DBCP is used for connection pooling functionality.
* Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality.
*
*/
@Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"})
@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
@CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage.")
public class DBCPConnectionPool extends AbstractControllerService implements DBCPService {
public static final DatabaseSystemDescriptor DEFAULT_DATABASE_SYSTEM = DatabaseSystems.getDescriptor("JavaDB");
public static final PropertyDescriptor DATABASE_SYSTEM = new PropertyDescriptor.Builder()
.name("Database Type")
.description("Database management system")
.allowableValues(DatabaseSystems.knownDatabaseSystems)
.defaultValue(DEFAULT_DATABASE_SYSTEM.getValue())
.required(true)
.build();
public static final PropertyDescriptor DB_HOST = new PropertyDescriptor.Builder()
.name("Database Host")
.description("Database Host")
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("Database Connection URL")
.description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
+ " The exact syntax of a database connection URL is specified by your DBMS.")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DB_PORT = new PropertyDescriptor.Builder()
.name("Database Port")
.description("Database server port")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
.name("Database Driver Class Name")
.description("Database driver class name")
.defaultValue(DEFAULT_DATABASE_SYSTEM.driverClassName)
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@ -90,14 +73,6 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
.name("Database Name")
.description("Database name")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
.name("Database User")
.description("Database user name")
@ -110,9 +85,9 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
.name("Password")
.description("The password for the database user")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
@ -138,13 +113,10 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
private static final List<PropertyDescriptor> properties;
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(DATABASE_SYSTEM);
props.add(DB_HOST);
props.add(DB_PORT);
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(DATABASE_URL);
props.add(DB_DRIVERNAME);
props.add(DB_DRIVER_JAR_URL);
props.add(DB_NAME);
props.add(DB_USER);
props.add(DB_PASSWORD);
props.add(MAX_WAIT_TIME);
@ -162,30 +134,29 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
/**
* Create new pool, open some connections ready to be used
* @param context the configuration context
* @throws InitializationException if unable to create a database connection
*
* @param context
* the configuration context
* @throws InitializationException
* if unable to create a database connection
*/
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException {
DatabaseSystemDescriptor dbsystem = DatabaseSystems.getDescriptor( context.getProperty(DATABASE_SYSTEM).getValue() );
String host = context.getProperty(DB_HOST).getValue();
Integer port = context.getProperty(DB_PORT).asInteger();
String drv = context.getProperty(DB_DRIVERNAME).getValue();
String dbname = context.getProperty(DB_NAME).getValue();
String user = context.getProperty(DB_USER).getValue();
String passw = context.getProperty(DB_PASSWORD).getValue();
Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
final String drv = context.getProperty(DB_DRIVERNAME).getValue();
final String user = context.getProperty(DB_USER).getValue();
final String passw = context.getProperty(DB_PASSWORD).getValue();
final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
dataSource = new BasicDataSource();
dataSource.setDriverClassName(drv);
// Optional driver URL, when exist, this URL will be used to locate driver jar file location
String urlString = context.getProperty(DB_DRIVER_JAR_URL).getValue();
dataSource.setDriverClassLoader( getDriverClassLoader(urlString, drv) );
final String urlString = context.getProperty(DB_DRIVER_JAR_URL).getValue();
dataSource.setDriverClassLoader(getDriverClassLoader(urlString, drv));
String dburl = dbsystem.buildUrl(host, port, dbname);
final String dburl = context.getProperty(DATABASE_URL).getValue();
dataSource.setMaxWait(maxWaitMillis);
dataSource.setMaxActive(maxTotal);
@ -196,40 +167,41 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
// verify connection can be established.
try {
Connection con = dataSource.getConnection();
if (con==null) {
final Connection con = dataSource.getConnection();
if (con == null) {
throw new InitializationException("Connection to database cannot be established.");
}
con.close();
} catch (SQLException e) {
} catch (final SQLException e) {
throw new InitializationException(e);
}
}
/**
* using Thread.currentThread().getContextClassLoader();
* will ensure that you are using the ClassLoader for you NAR.
* @throws InitializationException if there is a problem obtaining the ClassLoader
* using Thread.currentThread().getContextClassLoader(); will ensure that you are using the ClassLoader for you NAR.
*
* @throws InitializationException
* if there is a problem obtaining the ClassLoader
*/
protected ClassLoader getDriverClassLoader(String urlString, String drvName) throws InitializationException {
if (urlString!=null && urlString.length()>0) {
if (urlString != null && urlString.length() > 0) {
try {
URL[] urls = new URL[] { new URL(urlString) };
URLClassLoader ucl = new URLClassLoader(urls);
final URL[] urls = new URL[] { new URL(urlString) };
final URLClassLoader ucl = new URLClassLoader(urls);
// Workaround which allows to use URLClassLoader for JDBC driver loading.
// (Because the DriverManager will refuse to use a driver not loaded by the system ClassLoader.)
Class<?> clazz = Class.forName(drvName, true, ucl);
if (clazz==null) {
final Class<?> clazz = Class.forName(drvName, true, ucl);
if (clazz == null) {
throw new InitializationException("Can't load Database Driver " + drvName);
}
Driver driver = (Driver) clazz.newInstance();
DriverManager.registerDriver( new DriverShim(driver) );
final Driver driver = (Driver) clazz.newInstance();
DriverManager.registerDriver(new DriverShim(driver));
return ucl;
} catch (MalformedURLException e) {
} catch (final MalformedURLException e) {
throw new InitializationException("Invalid Database Driver Jar Url", e);
} catch (Exception e) {
} catch (final Exception e) {
throw new InitializationException("Can't load Database Driver", e);
}
} else {
@ -245,18 +217,17 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
public void shutdown() {
try {
dataSource.close();
} catch (SQLException e) {
} catch (final SQLException e) {
throw new ProcessException(e);
}
}
@Override
public Connection getConnection() throws ProcessException {
try {
Connection con = dataSource.getConnection();
final Connection con = dataSource.getConnection();
return con;
} catch (SQLException e) {
} catch (final SQLException e) {
throw new ProcessException(e);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.nifi.components.AllowableValue;
* An immutable object for holding information about a database system.
*
*/
@Deprecated
public class DatabaseSystemDescriptor extends AllowableValue {
public final String driverClassName;

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.dbcp;
@Deprecated
public class DatabaseSystems {
/**

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.dbcp;
import static org.apache.nifi.dbcp.DatabaseSystems.getDescriptor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -53,21 +52,6 @@ public class DBCPServiceTest {
System.setProperty("derby.stream.error.file", "target/derby.log");
}
/**
* Unknown database system.
*
*/
@Test
public void testUnknownDatabaseSystem() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
final Map<String, String> properties = new HashMap<String, String>();
properties.put(DBCPConnectionPool.DATABASE_SYSTEM.getName(), "garbage");
runner.addControllerService("test-bad2", service, properties);
runner.assertNotValid(service);
}
/**
* Missing property values.
*/
@ -81,8 +65,7 @@ public class DBCPServiceTest {
}
/**
* Test database connection using Derby.
* Connect, create table, insert, select, drop table.
* Test database connection using Derby. Connect, create table, insert, select, drop table.
*
*/
@Test
@ -92,25 +75,21 @@ public class DBCPServiceTest {
runner.addControllerService("test-good1", service);
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// Should setProperty call also generate DBCPConnectionPool.onPropertyModified() method call?
// It does not currently.
// Some properties already should have JavaDB/Derby default values, let's set only missing values.
runner.setProperty(service, DBCPConnectionPool.DB_HOST, "NA"); // Embedded Derby don't use host
runner.setProperty(service, DBCPConnectionPool.DB_PORT, "1"); // Embedded Derby don't use port, but must have value anyway
runner.setProperty(service, DBCPConnectionPool.DB_NAME, DB_LOCATION);
// set embedded Derby database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
runner.enableControllerService(service);
runner.assertValid(service);
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-good1");
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-good1");
Assert.assertNotNull(dbcpService);
Connection connection = dbcpService.getConnection();
final Connection connection = dbcpService.getConnection();
Assert.assertNotNull(connection);
createInsertSelectDrop(connection);
@ -119,12 +98,9 @@ public class DBCPServiceTest {
}
/**
* NB!!!!
* Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar
* Prerequisite: access to running MariaDb database server
* NB!!!! Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar Prerequisite: access to running MariaDb database server
*
* Test database connection using external JDBC jar located by URL.
* Connect, create table, insert, select, drop table.
* Test database connection using external JDBC jar located by URL. Connect, create table, insert, select, drop table.
*
*/
@Ignore
@ -134,27 +110,20 @@ public class DBCPServiceTest {
final DBCPConnectionPool service = new DBCPConnectionPool();
runner.addControllerService("test-external-jar", service);
DatabaseSystemDescriptor mariaDb = getDescriptor("MariaDB");
assertNotNull(mariaDb);
// Set MariaDB properties values.
runner.setProperty(service, DBCPConnectionPool.DATABASE_SYSTEM, mariaDb.getValue());
runner.setProperty(service, DBCPConnectionPool.DB_PORT, mariaDb.defaultPort.toString());
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, mariaDb.driverClassName);
// set MariaDB database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:mariadb://localhost:3306/" + "testdb");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.mariadb.jdbc.Driver");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVER_JAR_URL, "file:///var/tmp/mariadb-java-client-1.1.7.jar");
runner.setProperty(service, DBCPConnectionPool.DB_HOST, "localhost"); // localhost
runner.setProperty(service, DBCPConnectionPool.DB_NAME, "testdb");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.enableControllerService(service);
runner.assertValid(service);
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-external-jar");
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-external-jar");
Assert.assertNotNull(dbcpService);
Connection connection = dbcpService.getConnection();
final Connection connection = dbcpService.getConnection();
Assert.assertNotNull(connection);
createInsertSelectDrop(connection);
@ -162,14 +131,11 @@ public class DBCPServiceTest {
connection.close(); // return to pool
}
@Rule
public ExpectedException exception = ExpectedException.none();
/**
* Test get database connection using Derby.
* Get many times, after a while pool should not contain any available connection
* and getConnection should fail.
* Test get database connection using Derby. Get many times, after a while pool should not contain any available connection and getConnection should fail.
*/
@Test
public void testExhaustPool() throws InitializationException, SQLException {
@ -178,33 +144,30 @@ public class DBCPServiceTest {
runner.addControllerService("test-exhaust", service);
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
runner.setProperty(service, DBCPConnectionPool.DB_HOST, "NA"); // Embedded Derby don't use host
runner.setProperty(service, DBCPConnectionPool.DB_PORT, "1"); // Embedded Derby don't use port, but must have value anyway
runner.setProperty(service, DBCPConnectionPool.DB_NAME, DB_LOCATION);
// set embedded Derby database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
runner.enableControllerService(service);
runner.assertValid(service);
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust");
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust");
Assert.assertNotNull(dbcpService);
exception.expect(ProcessException.class);
exception.expectMessage("Cannot get a connection, pool error Timeout waiting for idle object");
for (int i = 0; i < 100; i++) {
Connection connection = dbcpService.getConnection();
final Connection connection = dbcpService.getConnection();
Assert.assertNotNull(connection);
}
}
/**
* Test get database connection using Derby.
* Get many times, release immediately
* and getConnection should not fail.
* Test get database connection using Derby. Get many times, release immediately and getConnection should not fail.
*/
@Test
public void testGetManyNormal() throws InitializationException, SQLException {
@ -213,103 +176,98 @@ public class DBCPServiceTest {
runner.addControllerService("test-exhaust", service);
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
runner.setProperty(service, DBCPConnectionPool.DB_HOST, "NA"); // Embedded Derby don't use host
runner.setProperty(service, DBCPConnectionPool.DB_PORT, "1"); // Embedded Derby don't use port, but must have value anyway
runner.setProperty(service, DBCPConnectionPool.DB_NAME, DB_LOCATION);
// set embedded Derby database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
runner.enableControllerService(service);
runner.assertValid(service);
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust");
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust");
Assert.assertNotNull(dbcpService);
for (int i = 0; i < 1000; i++) {
Connection connection = dbcpService.getConnection();
final Connection connection = dbcpService.getConnection();
Assert.assertNotNull(connection);
connection.close(); // will return connection to pool
}
}
@Test
public void testDriverLoad() throws ClassNotFoundException {
Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
assertNotNull(clazz);
}
/**
* NB!!!!
* Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar
* NB!!!! Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar
*/
@Test
@Ignore("Intended only for local testing, not automated testing")
public void testURLClassLoader() throws ClassNotFoundException, MalformedURLException, SQLException, InstantiationException, IllegalAccessException {
URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar");
URL[] urls = new URL[] { url };
final URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar");
final URL[] urls = new URL[] { url };
ClassLoader parent = Thread.currentThread().getContextClassLoader();
URLClassLoader ucl = new URLClassLoader(urls,parent);
final ClassLoader parent = Thread.currentThread().getContextClassLoader();
final URLClassLoader ucl = new URLClassLoader(urls, parent);
Class<?> clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl);
final Class<?> clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl);
assertNotNull(clazz);
Driver driver = (Driver) clazz.newInstance();
Driver shim = new DriverShim(driver);
DriverManager.registerDriver( shim );
final Driver driver = (Driver) clazz.newInstance();
final Driver shim = new DriverShim(driver);
DriverManager.registerDriver(shim);
Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb");
final Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb");
assertNotNull(driver2);
}
/**
* NB!!!!
* Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar
* Prerequisite: access to running MariaDb database server
* NB!!!! Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar Prerequisite: access to running MariaDb database server
*/
@Test
@Ignore("Intended only for local testing, not automated testing")
public void testURLClassLoaderGetConnection() throws ClassNotFoundException, MalformedURLException, SQLException, InstantiationException, IllegalAccessException {
URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar");
URL[] urls = new URL[] { url };
final URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar");
final URL[] urls = new URL[] { url };
ClassLoader parent = Thread.currentThread().getContextClassLoader();
URLClassLoader ucl = new URLClassLoader(urls,parent);
final ClassLoader parent = Thread.currentThread().getContextClassLoader();
final URLClassLoader ucl = new URLClassLoader(urls, parent);
Class<?> clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl);
final Class<?> clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl);
assertNotNull(clazz);
Driver driver = (Driver) clazz.newInstance();
Driver shim = new DriverShim(driver);
DriverManager.registerDriver( shim );
final Driver driver = (Driver) clazz.newInstance();
final Driver shim = new DriverShim(driver);
DriverManager.registerDriver(shim);
Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb");
final Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb");
assertNotNull(driver2);
Connection connection = DriverManager.getConnection("jdbc:mariadb://localhost:3306/testdb","tester","testerp");
final Connection connection = DriverManager.getConnection("jdbc:mariadb://localhost:3306/testdb", "tester", "testerp");
assertNotNull(connection);
connection.close();
DriverManager.deregisterDriver(shim);
}
String createTable = "create table restaurants(id integer, name varchar(20), city varchar(50))";
String dropTable = "drop table restaurants";
protected void createInsertSelectDrop( Connection con) throws SQLException {
protected void createInsertSelectDrop(Connection con) throws SQLException {
Statement st = con.createStatement();
final Statement st = con.createStatement();
try {
st.executeUpdate(dropTable);
} catch (Exception e) {
} catch (final Exception e) {
// table may not exist, this is not serious problem.
}
@ -320,7 +278,7 @@ public class DBCPServiceTest {
st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')");
int nrOfRows = 0;
ResultSet resultSet = st.executeQuery("select * from restaurants");
final ResultSet resultSet = st.executeQuery("select * from restaurants");
while (resultSet.next())
nrOfRows++;
assertEquals(3, nrOfRows);

View File

@ -847,11 +847,21 @@
<artifactId>nifi-dbcp-service</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.11.1.1</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>